介绍

PHP 连接阿里云 RocketMQ 需要通过SDK进行连接。aliyun.mq 官方提供的基于 HTTP协议 的SDK如下(截止到2022.09.30):

文档 star数量 说明
aliyunmq/mq-http-php-sdk https://github.com/aliyunmq/mq-http-php-sdk/blob/master/README.md 69 aliyun.mq官方提供

安装

安装此扩展包的首选方法是通过composer

composer require aliyunmq/mq-http-sdk:1.0.3

配置

查看源码

创建客户端:

文件位置:src/Client/RocketmqClient.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<?php

namespace PhpTools\Client;

use Exception;
use MQ\MQClient;

class RocketmqClient
{
/**
* rocketmq的配置
* @var string
*/
private static $_config;

/**
* rocketmq的连接池
* @var object
*/
private static $_client;

/**
* 获取连接rocketmq的单例
*
* @param $config
* @return MQClient
* @throws Exception
*/
public static function getInstance($config): MQClient
{
// 首先确认已安装composer包: aliyunmq/mq-http-sdk(这里版本是1.0.3)
$instanceKey = md5(serialize($config));
if (!isset(self::$_client[$instanceKey])) {
if (empty($config['endPoint']) || empty($config['accessKey']) || empty($config['secretKey'])) {
throw new Exception('unknown endPoint/accessKey/secretKey');
}
self::$_config = [
'endPoint' => $config['endPoint'],
'accessKey' => $config['accessKey'],
'secretKey' => $config['secretKey'],
];
self::$_client[$instanceKey] = new MQClient(
self::$_config['endPoint'],
self::$_config['accessKey'],
self::$_config['secretKey'],
);
}
return self::$_client[$instanceKey];
}
}

集成代码

操作类封装

文件位置:src/RocketmqTools.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
<?php

namespace PhpTools;

use Exception;
use MQ\Model\TopicMessage;
use MQ\MQConsumer;
use MQ\MQProducer;
use MQ\Exception\MessageNotExistException;
use MQ\Exception\AckMessageException;
use PhpTools\Client\RocketmqClient;

class RocketmqTools
{
/**
* @var RocketmqClient
*/
private $_client;

/**
* @var MQProducer
*/
private $_producer;

/**
* @var MQConsumer
*/
private $_consumer;

public function __construct($topic, $instanceId, $groupId)
{
$config = [
// 设置HTTP接入域名(此处以公共云生产环境为例)
'endPoint' => "******",
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
'accessKey' => "******",
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
'secretKey' => "******"
];
$this->_client = RocketmqClient::getInstance($config);
$this->_producer = $this->_client->getProducer($instanceId, $topic);
!empty($groupId) && $this->_consumer = $this->_client->getConsumer($instanceId, $topic, $groupId);
}

/**
* 生产普通消息
*
* @return void
*/
public function produceSimpleMsg()
{
try {
for ($i = 1; $i <= 4; $i++) {
$publishMessage = new TopicMessage(
"这是一条简单消息{$i}"// 消息内容
);
// 设置属性
$publishMessage->putProperty("a", $i);
// 设置消息KEY
$publishMessage->setMessageKey("MessageKey");
if ($i % 2 == 0) {
// 定时消息, 定时时间为10s后
$publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
}
$result = $this->_producer->publishMessage($publishMessage);

print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
}
} catch (Exception $e) {
print_r($e->getMessage() . "\n");
}
}

/**
* 消费消息
*
* @return void
*/
public function consume()
{
// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
while (True) {
try {
// 长轮询消费消息
// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
$messages = $this->_consumer->consumeMessage(
3, // 一次最多消费3条(最多可设置为16条)
3 // 长轮询时间3秒(最多可设置为30秒)
);
} catch (Exception $e) {
if ($e instanceof MessageNotExistException) {
// 没有消息可以消费,接着轮询
printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId());
continue;
}
print_r($e->getMessage() . "\n");
sleep(3);
continue;
}
print "consume finish, messages:\n";
// 处理业务逻辑
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf(
"MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n",
$message->getMessageId(),
$message->getMessageTag(),
$message->getMessageBody(),
$message->getPublishTime(),
$message->getFirstConsumeTime(),
$message->getConsumedTimes(),
$message->getNextConsumeTime(),
$message->getMessageKey()
);
print_r($message->getProperties());
}
// $message->getNextConsumeTime()前若不确认消息消费成功,则消息会重复消费
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
print_r($receiptHandles);
try {
$this->_consumer->ackMessage($receiptHandles);
} catch (Exception $e) {
if ($e instanceof AckMessageException) {
// 某些消息的句柄可能超时了会导致确认不成功
printf("Ack Error, RequestId:%s\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf(
"\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
$errorItem->getReceiptHandle(),
$errorItem->getErrorCode(),
$errorItem->getErrorCode()
);
}
}
}
print "ack finish\n";
}
}
}

简单使用

创建测试用例

文件位置:tests/Cases/RocketmqTest.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?php

namespace ToolsTest\Cases;

use PhpTools\RocketmqTools;
use PHPUnit\Framework\TestCase;

class RocketmqTest extends TestCase
{
// 生产普通消息
public function testProduceSimpleMsg()
{
// 所属的 Topic
$topic = "php-simple-topic";
// Topic所属实例ID,默认实例为空NULL
$instanceId = "MQ_INST_1050827944341157_BYN8fhkQ";
$rmq = new RocketmqTools($topic, $instanceId, '');
$rmq->produceSimpleMsg();
}

// 消费消息
public function testConsume()
{
// 所属的 Topic
$topic = "php-simple-topic";
// Topic所属实例ID,默认实例为空NULL
$instanceId = "MQ_INST_1050827944341157_BYN8fhkQ";
$groupId = "GID_PHP_TEST";
$rmq = new RocketmqTools($topic, $instanceId, $groupId);
$rmq->consume();
}
}

请求示例

生产普通消息

1
➜  php-tools git:(master) phpunit -c phpunit.xml --colors=always tests/Cases/RocketmqTest.php --filter testProduceSimpleMsg

消费消息

1
➜  php-tools git:(master) phpunit -c phpunit.xml --colors=always tests/Cases/RocketmqTest.php --filter testConsume