介绍
PHP
连接阿里云 RocketMQ
需要通过SDK进行连接。aliyun.mq
官方提供的基于 HTTP协议
的SDK如下(截止到2022.09.30):
安装
安装此扩展包的首选方法是通过composer
。
composer require aliyunmq/mq-http-sdk:1.0.3
配置
查看源码
创建客户端:
文件位置:src/Client/RocketmqClient.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| <?php
namespace PhpTools\Client;
use Exception; use MQ\MQClient;
class RocketmqClient {
private static $_config;
private static $_client;
public static function getInstance($config): MQClient { $instanceKey = md5(serialize($config)); if (!isset(self::$_client[$instanceKey])) { if (empty($config['endPoint']) || empty($config['accessKey']) || empty($config['secretKey'])) { throw new Exception('unknown endPoint/accessKey/secretKey'); } self::$_config = [ 'endPoint' => $config['endPoint'], 'accessKey' => $config['accessKey'], 'secretKey' => $config['secretKey'], ]; self::$_client[$instanceKey] = new MQClient( self::$_config['endPoint'], self::$_config['accessKey'], self::$_config['secretKey'], ); } return self::$_client[$instanceKey]; } }
|
集成代码
操作类封装
文件位置:src/RocketmqTools.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
| <?php
namespace PhpTools;
use Exception; use MQ\Model\TopicMessage; use MQ\MQConsumer; use MQ\MQProducer; use MQ\Exception\MessageNotExistException; use MQ\Exception\AckMessageException; use PhpTools\Client\RocketmqClient;
class RocketmqTools {
private $_client;
private $_producer;
private $_consumer;
public function __construct($topic, $instanceId, $groupId) { $config = [ 'endPoint' => "******", 'accessKey' => "******", 'secretKey' => "******" ]; $this->_client = RocketmqClient::getInstance($config); $this->_producer = $this->_client->getProducer($instanceId, $topic); !empty($groupId) && $this->_consumer = $this->_client->getConsumer($instanceId, $topic, $groupId); }
public function produceSimpleMsg() { try { for ($i = 1; $i <= 4; $i++) { $publishMessage = new TopicMessage( "这是一条简单消息{$i}" ); $publishMessage->putProperty("a", $i); $publishMessage->setMessageKey("MessageKey"); if ($i % 2 == 0) { $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000); } $result = $this->_producer->publishMessage($publishMessage);
print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n"; } } catch (Exception $e) { print_r($e->getMessage() . "\n"); } }
public function consume() { while (True) { try { $messages = $this->_consumer->consumeMessage( 3, // 一次最多消费3条(最多可设置为16条) 3 ); } catch (Exception $e) { if ($e instanceof MessageNotExistException) { printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId()); continue; } print_r($e->getMessage() . "\n"); sleep(3); continue; } print "consume finish, messages:\n"; $receiptHandles = array(); foreach ($messages as $message) { $receiptHandles[] = $message->getReceiptHandle(); printf( "MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n", $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(), $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(), $message->getMessageKey() ); print_r($message->getProperties()); } print_r($receiptHandles); try { $this->_consumer->ackMessage($receiptHandles); } catch (Exception $e) { if ($e instanceof AckMessageException) { printf("Ack Error, RequestId:%s\n", $e->getRequestId()); foreach ($e->getAckMessageErrorItems() as $errorItem) { printf( "\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode() ); } } } print "ack finish\n"; } } }
|
简单使用
创建测试用例
文件位置:tests/Cases/RocketmqTest.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| <?php
namespace ToolsTest\Cases;
use PhpTools\RocketmqTools; use PHPUnit\Framework\TestCase;
class RocketmqTest extends TestCase { public function testProduceSimpleMsg() { $topic = "php-simple-topic"; $instanceId = "MQ_INST_1050827944341157_BYN8fhkQ"; $rmq = new RocketmqTools($topic, $instanceId, ''); $rmq->produceSimpleMsg(); }
public function testConsume() { $topic = "php-simple-topic"; $instanceId = "MQ_INST_1050827944341157_BYN8fhkQ"; $groupId = "GID_PHP_TEST"; $rmq = new RocketmqTools($topic, $instanceId, $groupId); $rmq->consume(); } }
|
请求示例
生产普通消息
1
| ➜ php-tools git:(master) phpunit -c phpunit.xml --colors=always tests/Cases/RocketmqTest.php --filter testProduceSimpleMsg
|
消费消息
1
| ➜ php-tools git:(master) phpunit -c phpunit.xml --colors=always tests/Cases/RocketmqTest.php --filter testConsume
|