0%

PHP之延时队列示例

延时队列,顾名思义,其首先是一种队列,即一种先进先出的数据结构。

一、概念

      延时队列,顾名思义,其首先是一种队列,即一种先进先出的数据结构。它与队列最大的区别在于延时属性上,延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。

二、实现

  1. 基于Redis有序集合Zset实现
    • 根据score值范围查询[0,当前时间戳]即为要处理的订单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?php

class DelayQueue {
public $redis;

public function __construct() {
$redis = new \Redis();
$res = $redis->connect('127.0.0.1', 6379, 5);
if ($res === false) {
throw new Exception('连接失败');
}
$this->redis = $redis;
}

public function set($key, $value, $score) {
$res = $this->redis->zAdd($key, ['NX'], $score, $value);
if ($res == 0) {
throw new Exception('入队列失败');
}
}

public function deal($key) {
while (true) {
$res = $this->redis->zRangeByScore($key, 0, time(), ['limit' => [0, 1]]);
echo '正常处理' . PHP_EOL;
if (empty($res)) {
sleep(1);
continue;
}
$value = $res[0];
$res = $this->redis->zRem($key, $value);
if ($res) {
var_dump(sprintf("订单【%s】30分钟未支付,已自动取消", $value));
}
}
}
}

$model = new DelayQueue();
$key = "order:delayqueue";

$ordId = "order1";
$model->set($key, $ordId, time()+5);

$ordId = "order2";
$model->set($key, $ordId, time()+10);

$ordId = "order3";
$model->set($key, $ordId, time()+15);

$model->deal($key);
  1. 基于RabbitMQ死信队列DXL实现
    • TTL(Time To Live):消息的存活时间,RabbitMQ可以通过x-message-ttl参数来设置指定队列Queue和消息Message的存活时间,它的值是一个非负整数,单位为微秒。
    • DLX(Dead Letter Exchanges):死信交换机,绑定在死信交换机上的队列就是死信队列。RabbitMQ的队列Queue可以配置两个参数x-dead-letter-exchangex-dead-letter-routing-key(可选),一旦队列内出现了死信(Dead Letter),则按照这两个参数可以将消息重新路由到另一个交换机Exchange从而让消息重新被消费。
      • 队列出现死信的情况有:
        • 消息或者队列的TTL过期
        • 队列达到最大长度
        • 消息被消费端拒绝
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
### composer.json
{
"require": {
"phpunit/phpunit": "^9.4",
"php-amqplib/php-amqplib": "^3.0"
}
}

### RabbitMQ.php
<?php

namespace RabbitMQ;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RabbitMQ
{

private $host = '127.0.0.1';
private $port = 5672;
private $user = 'guest';
private $password = 'guest';
protected $connection;
protected $channel;

public function __construct() {
$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password);
$this->channel = $this->connection->channel();
}

public function sendMessage($message, $routeKey, $exchange = '', $properties = []) {
$data = new AMQPMessage(
$message, $properties
);
$this->channel->basic_publish($data, $exchange, $routeKey);
}

public function consumeMessage($queueName,$callback) {
$this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}

public function __destruct() {
$this->channel->close();
$this->connection->close();
}
}

### DelayQueue.php
<?php

namespace RabbitMQ;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class DelayQueue extends RabbitMQ
{
public function createQueue($ttl, $delayExName, $delayQueueName, $queueName) {
$args = new AMQPTable([
'x-dead-letter-exchange' => $delayExName,
'x-message-ttl' => $ttl, //消息存活时间
'x-dead-letter-routing-key' => $queueName
]);
$this->channel->queue_declare($queueName, false, true, false, false, false, $args);
$this->channel->exchange_declare($delayExName, AMQPExchangeType::DIRECT, false, true, false);
$this->channel->queue_declare($delayQueueName, false, true, false, false);
$this->channel->queue_bind($delayQueueName, $delayExName, $queueName, false);
}
}

### producer.php
<?php
require_once './vendor/autoload.php';
require 'RabbitMQ.php';
require 'DelayQueue.php';

$delay = new \RabbitMQ\DelayQueue();
$ttl = 1000 * 100;//订单100s后超时
$delayExName = 'delay-order-exchange';//超时exchange
$delayQueueName = 'delay-order-queue';//超时queue
$queueName = 'ttl-order-queue';//订单queue

$delay->createQueue($ttl, $delayExName, $delayQueueName, $queueName);

//100个订单信息,每个订单超时时间都是10s
for ($i=0; $i<50; $i++) {
$data = [
'order_id' => $i + 1,
'remark' => 'this is a order test'
];
$delay->sendMessage(json_encode($data), $queueName);
sleep(1);
echo $data['order_id'] . "入队成功!!!", PHP_EOL;
}

### consumer.php
<?php
require_once './vendor/autoload.php';
require 'RabbitMQ.php';
require 'DelayQueue.php';

// 消费者

$delay = new \RabbitMQ\DelayQueue();
$delayQueueName = 'delay-order-queue';
$callback = function ($msg) {
echo $msg->body . PHP_EOL;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//处理订单超时逻辑,给用户推送提醒等等。。。
sleep(5);
};

$delay->consumeMessage($delayQueueName, $callback);

三、参考

  1. 参考一
  2. 参考二
  3. 参考三
  4. 参考四