选择大于努力,思路决定出路,观念决定命运,梦想照亮人生。—— 张旭 《思维力:思路决定出路,观念决定行动》
一、概念
延时队列,顾名思义,其首先是一种队列,即一种先进先出的数据结构。它与队列最大的区别在于延时属性上,延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。
使用场景
- 订单在十分钟之内未支付则自动取消。
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 账单在一周内未支付,则自动结算。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
实现方案
基于Redis有序集合实现
定时任务轮询数据库,看是否有产生新任务,如果产生则消费任务
pcntl_alarm为进程设置一个闹钟信号
swoole的异步高精度定时器:swoole_time_tick和swoole_time_after
- 有点类似于javascript中setInterval和setTimeout
rabbitmq延迟任务
- TTL + DLX
- 使用延迟插件
二、实战
- 基于Redis有序集合Zset实现
- 根据score值范围查询
[0,当前时间戳]
即为要处理的订单
- 根据score值范围查询
1 | <?php |
基于php-amqplib实现(RabbitMQ3.9.15 Erlang24.3.3)
TTL(Time To Live):消息的存活时间,RabbitMQ可以通过
x-message-ttl
参数来设置指定队列Queue和消息Message的存活时间,它的值是一个非负整数,单位为微秒。DLX(Dead Letter Exchanges):死信交换机,绑定在死信交换机上的队列就是死信队列。RabbitMQ的队列Queue可以配置两个参数
x-dead-letter-exchange
和x-dead-letter-routing-key(可选)
,一旦队列内出现了死信(Dead Letter),则按照这两个参数可以将消息重新路由到另一个交换机Exchange从而让消息重新被消费。队列出现死信的情况有:
- 消息或者队列的TTL过期
- 队列达到最大长度
- 消息被消费端拒绝
实现
- composer.json
1
2
3
4
5
6{
"require": {
"phpunit/phpunit": "^9.4",
"php-amqplib/php-amqplib": "^3.0"
}
}- composer install
- 生产者producer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65require 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$conf = [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
];
$exchangeName = 'origindelayExchange';
$queueName = 'originTtlQueue';
$delayName = 'originDelayQueue';
$connect = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost']);
$channel = $connect->channel();
$args = new AMQPTable();
$args->set('x-dead-letter-exchange', $exchangeName);
$args->set('x-message-ttl', 10000);
$args->set('x-dead-letter-routing-key', $queueName);
// 不能写为$delayName,否则ttlQueue过期后不会路由到delayQueue
// $args->set('x-dead-letter-routing-key', $delayName);
$channel->queue_declare($queueName, false, true, false, false, false, $args);
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_declare($delayName, false, true, false, false);
$channel->queue_bind($delayName, $exchangeName, $queueName, false);
// $channel->confirm_select();
// $channel->set_ack_handler(function (AMQPMessage $msg){
// echo 'ack:' . $msg->getBody() .PHP_EOL;
// });
// $channel->set_nack_handler(function (AMQPMessage $msg){
// echo 'nack:' . $msg->getBody() .PHP_EOL;
// });
for ($i=1; $i<=100; $i++) {
$msgBody = json_encode(["name" => "liusir", "no" => $i, "sql" => "select * from user"]);
$msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]);
// useless
// $tag = rand(0, 10000000);
// $msg->setDeliveryTag($tag);
// 不能指定exchange,否则ttlQueue不生效
// $ret = $channel->basic_publish($msg, $exchangeName, $queueName);
$ret = $channel->basic_publish($msg, '', $queueName);
// single confirm
// $channel->wait_for_pending_acks(2); //等待ACK确认
echo "{$i}入队成功!!!", PHP_EOL;
}
// multi confirm
// $channel->wait_for_pending_acks(2); //等待ACK确认
$channel->close();
$connect->close();- 消费者consumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50require 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$conf = [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
];
$exchangeName = 'origindelayExchange';
$queueName = 'originTtlQueue';
$routingKey = 'originDelayQueue';
$conn = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost']);
$channel = $conn->channel();
// 方式一:声明ttl队列再消费
// $args = new AMQPTable();
// $args->set('x-dead-letter-exchange', $exchangeName);
// $args->set('x-message-ttl',10000);
// $args->set('x-dead-letter-routing-key', $queueName);
// $channel->queue_declare($routingKey, false, true, false, false, false, $args);
// 方式二:声明普通队列再消费
// $channel->queue_declare($routingKey, false, true, false, false, false);
// 方式三:不声明队列直接消费(前提是队列已存在)
// none code
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
echo "Received: ", json_decode($msg->body)->no, "\n";
// $msg->nack(true);
};
// 消费delay队列
$channel->basic_consume($routingKey, '', false, false, false, false, $callback);
// 直接消费ttl队列
// $channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
// sleep(1);
}- php producer.php,登录http://localhost:15672/#/queues可查看到消息的路由过程
基于amqp扩展(1.11.0) + rabbitmq_delayed_message_exchange插件实现(RabbitMQ3.9.15 Erlang24.3.3)
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- brew安装的rabbitmq有可能默认没有此插件,需要手动下载
- 找到合适的版本
- 切换到插件目录
cd your_rabbitmq/plugins
- 下载
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
- 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- brew安装的rabbitmq有可能默认没有此插件,需要手动下载
-
- 源码安装
- 下载
wget https://pecl.php.net/get/amqp-1.11.0.tgz
- 解压
tar -zxvf amqp-1.11.0.tgz
- cd amqp-1.11.0
- sudo /usr/local/php/7.1/bin/phpize
- sudo ./configure –with-php-config=/usr/local/php/7.1/bin/php-config
- 报错
librabbitmq not found
- 查找lib库
brew search librabbitmq
,没有 - 安装rabbitmq-c
brew install rabbitmq-c
- 报错
- TODO
- 下载
- pecl安装
- brew install rabbitmq-c
- sudo curl https://pecl.php.net/get/amqp-1.11.0.tgz -o amqp-1.11.0.tgz
- sudo ./pecl install amqp-1.11.0.tgz
Set the path to librabbitmq install prefix [autodetect] :
设置rabbitmq-c目录
- 源码安装
实现
- producer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47$exchangeName = 'pluginDelayedExchange';
$queueName = 'pluginTtlQueue';
$routeKey = 'pluginDelayQueue';
$config = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => 'liusir'
);
// var_dump(extension_loaded('amqp'));
$conn = new AMQPConnection($config);
$conn->connect();
$channel = new AMQPChannel($conn);
$exchange = new AMQPExchange($channel);
// $ref = new ReflectionClass($conn);
// $ref = new ReflectionClass($channel);
// $ref = new ReflectionClass($exchange);
$exchange->setName($exchangeName);
$exchange->setType('x-delayed-message');
$exchange->setArgument('x-delayed-type', 'direct');
$exchange->declareExchange();
// $channel->startTransaction();
$queue = new AMQPQueue($channel);
// $ref = new ReflectionClass($queue);
$queue->setName($routeKey);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchangeName, $queueName);
for ($i=1; $i<=5; $i++) {
$message = json_encode(['order_id' => time(), 'i' => $i]);
$ret = $exchange->publish($message, $queueName, AMQP_NOPARAM, ['headers' => ['x-delay' => 10000]]);
// if (!$ret) {
// $channel->rollbackTransaction();
// }
echo "消息 $i 发送成功",PHP_EOL;
// sleep(2);
}
// $channel->commitTransaction();
$conn->disconnect();- consumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60$exchangeName = 'pluginDelayedExchange';
$queueName = 'pluginTtlQueue';
$routeKey = 'pluginDelayQueue';
$config = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp'));
$conn = new AMQPConnection($config);
$conn->connect();
$channel = new AMQPChannel($conn);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType('x-delayed-message');
$exchange->setArgument('x-delayed-type', 'direct');
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($routeKey);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchangeName, $routeKey);
//$channel->commitTransaction();
function callback(AMQPEnvelope $message) {
global $queue;
if ($message) {
$body = $message->getBody();
echo '接收内容:'.$body . PHP_EOL;
$queue->ack($message->getDeliveryTag());
} else {
echo 'no message' . PHP_EOL;
}
}
$action = '1';
if ($action == '1') {
$queue->consume('callback');
} else {
$start = time();
while(true) {
$message = $queue->get();
if(!empty($message)) {
echo '接收内容:' . $message->getBody() . PHP_EOL;
$queue->ack($message->getDeliveryTag());
$end = time();
echo '运行时间:' . ($end - $start) . '秒' . PHP_EOL;
} else {
// echo 'no message' . PHP_EOL;
}
}
}- php producer.php,登录http://localhost:15672/#/queues,只可看到延时队列pluginDelayQueue的情况,看不到ttl队列