0%

延时队列

选择大于努力,思路决定出路,观念决定命运,梦想照亮人生。—— 张旭 《思维力:思路决定出路,观念决定行动》

一、概念

      延时队列,顾名思义,其首先是一种队列,即一种先进先出的数据结构。它与队列最大的区别在于延时属性上,延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。

  1. 使用场景

    • 订单在十分钟之内未支付则自动取消。
    • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
    • 账单在一周内未支付,则自动结算。
    • 用户注册成功后,如果三天内没有登陆则进行短信提醒。
    • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
    • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
  2. 实现方案

    • 基于Redis有序集合实现

    • 定时任务轮询数据库,看是否有产生新任务,如果产生则消费任务

    • pcntl_alarm为进程设置一个闹钟信号

    • swoole的异步高精度定时器:swoole_time_tick和swoole_time_after

      • 有点类似于javascript中setInterval和setTimeout
    • rabbitmq延迟任务

      • TTL + DLX
      • 使用延迟插件

      延迟队列

二、实战

  1. 基于Redis有序集合Zset实现
    • 根据score值范围查询[0,当前时间戳]即为要处理的订单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?php

class DelayQueue {
public $redis;

public function __construct() {
$redis = new \Redis();
$res = $redis->connect('127.0.0.1', 6379, 5);
if ($res === false) {
throw new Exception('连接失败');
}
$this->redis = $redis;
}

public function set($key, $value, $score) {
$res = $this->redis->zAdd($key, ['NX'], $score, $value);
if ($res == 0) {
throw new Exception('入队列失败');
}
}

public function deal($key) {
while (true) {
$res = $this->redis->zRangeByScore($key, 0, time(), ['limit' => [0, 1]]);
echo '正常处理' . PHP_EOL;
if (empty($res)) {
sleep(1);
continue;
}
$value = $res[0];
$res = $this->redis->zRem($key, $value);
if ($res) {
var_dump(sprintf("订单【%s】30分钟未支付,已自动取消", $value));
}
}
}
}

$model = new DelayQueue();
$key = "order:delayqueue";

$ordId = "order1";
$model->set($key, $ordId, time()+5);

$ordId = "order2";
$model->set($key, $ordId, time()+10);

$ordId = "order3";
$model->set($key, $ordId, time()+15);

$model->deal($key);
  1. 基于php-amqplib实现(RabbitMQ3.9.15 Erlang24.3.3)

    • TTL(Time To Live):消息的存活时间,RabbitMQ可以通过x-message-ttl参数来设置指定队列Queue和消息Message的存活时间,它的值是一个非负整数,单位为微秒。

    • DLX(Dead Letter Exchanges):死信交换机,绑定在死信交换机上的队列就是死信队列。RabbitMQ的队列Queue可以配置两个参数x-dead-letter-exchangex-dead-letter-routing-key(可选),一旦队列内出现了死信(Dead Letter),则按照这两个参数可以将消息重新路由到另一个交换机Exchange从而让消息重新被消费。

    • 队列出现死信的情况有:

      • 消息或者队列的TTL过期
      • 队列达到最大长度
      • 消息被消费端拒绝
    • 实现

      • composer.json
      1
      2
      3
      4
      5
      6
      {
      "require": {
      "phpunit/phpunit": "^9.4",
      "php-amqplib/php-amqplib": "^3.0"
      }
      }
      • composer install
      • 生产者producer.php
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      require 'vendor/autoload.php';

      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
      use PhpAmqpLib\Wire\AMQPTable;
      use PhpAmqpLib\Exchange\AMQPExchangeType;

      $conf = [
      'host' => 'localhost',
      'port' => 5672,
      'user' => 'guest',
      'password' => 'guest',
      'vhost' => '/',
      ];
      $exchangeName = 'origindelayExchange';
      $queueName = 'originTtlQueue';
      $delayName = 'originDelayQueue';

      $connect = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost']);
      $channel = $connect->channel();

      $args = new AMQPTable();
      $args->set('x-dead-letter-exchange', $exchangeName);
      $args->set('x-message-ttl', 10000);
      $args->set('x-dead-letter-routing-key', $queueName);
      // 不能写为$delayName,否则ttlQueue过期后不会路由到delayQueue
      // $args->set('x-dead-letter-routing-key', $delayName);

      $channel->queue_declare($queueName, false, true, false, false, false, $args);
      $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, false);
      $channel->queue_declare($delayName, false, true, false, false);
      $channel->queue_bind($delayName, $exchangeName, $queueName, false);

      // $channel->confirm_select();
      // $channel->set_ack_handler(function (AMQPMessage $msg){
      // echo 'ack:' . $msg->getBody() .PHP_EOL;
      // });
      // $channel->set_nack_handler(function (AMQPMessage $msg){
      // echo 'nack:' . $msg->getBody() .PHP_EOL;
      // });

      for ($i=1; $i<=100; $i++) {
      $msgBody = json_encode(["name" => "liusir", "no" => $i, "sql" => "select * from user"]);
      $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]);

      // useless
      // $tag = rand(0, 10000000);
      // $msg->setDeliveryTag($tag);

      // 不能指定exchange,否则ttlQueue不生效
      // $ret = $channel->basic_publish($msg, $exchangeName, $queueName);

      $ret = $channel->basic_publish($msg, '', $queueName);

      // single confirm
      // $channel->wait_for_pending_acks(2); //等待ACK确认

      echo "{$i}入队成功!!!", PHP_EOL;
      }

      // multi confirm
      // $channel->wait_for_pending_acks(2); //等待ACK确认

      $channel->close();
      $connect->close();
      • 消费者consumer.php
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      require 'vendor/autoload.php';

      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Wire\AMQPTable;

      $conf = [
      'host' => 'localhost',
      'port' => 5672,
      'user' => 'guest',
      'password' => 'guest',
      'vhost' => '/',
      ];

      $exchangeName = 'origindelayExchange';
      $queueName = 'originTtlQueue';
      $routingKey = 'originDelayQueue';
      $conn = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost']);
      $channel = $conn->channel();

      // 方式一:声明ttl队列再消费
      // $args = new AMQPTable();
      // $args->set('x-dead-letter-exchange', $exchangeName);
      // $args->set('x-message-ttl',10000);
      // $args->set('x-dead-letter-routing-key', $queueName);
      // $channel->queue_declare($routingKey, false, true, false, false, false, $args);

      // 方式二:声明普通队列再消费
      // $channel->queue_declare($routingKey, false, true, false, false, false);

      // 方式三:不声明队列直接消费(前提是队列已存在)
      // none code

      $channel->basic_qos(null, 1, null);

      $callback = function ($msg) {
      $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
      echo "Received: ", json_decode($msg->body)->no, "\n";
      // $msg->nack(true);
      };

      // 消费delay队列
      $channel->basic_consume($routingKey, '', false, false, false, false, $callback);
      // 直接消费ttl队列
      // $channel->basic_consume($queueName, '', false, false, false, false, $callback);


      while ($channel->is_consuming()) {
      $channel->wait();
      // sleep(1);
      }
  1. 基于amqp扩展(1.11.0) + rabbitmq_delayed_message_exchange插件实现(RabbitMQ3.9.15 Erlang24.3.3)

    • 启用插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange

      • brew安装的rabbitmq有可能默认没有此插件,需要手动下载
        • 找到合适的版本
        • 切换到插件目录cd your_rabbitmq/plugins
        • 下载wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
        • 启用插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    • 安装扩展amqp

      • 源码安装
        • 下载wget https://pecl.php.net/get/amqp-1.11.0.tgz
        • 解压tar -zxvf amqp-1.11.0.tgz
        • cd amqp-1.11.0
        • sudo /usr/local/php/7.1/bin/phpize
        • sudo ./configure –with-php-config=/usr/local/php/7.1/bin/php-config
          • 报错librabbitmq not found
          • 查找lib库brew search librabbitmq,没有
          • 安装rabbitmq-cbrew install rabbitmq-c
        • TODO
      • pecl安装
        • brew install rabbitmq-c
        • sudo curl https://pecl.php.net/get/amqp-1.11.0.tgz -o amqp-1.11.0.tgz
        • sudo ./pecl install amqp-1.11.0.tgz
          • Set the path to librabbitmq install prefix [autodetect] :设置rabbitmq-c目录
    • 实现

      • producer.php
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      $exchangeName = 'pluginDelayedExchange';
      $queueName = 'pluginTtlQueue';
      $routeKey = 'pluginDelayQueue';

      $config = array(
      'host' => 'localhost',
      'port' => 5672,
      'login' => 'guest',
      'password' => 'guest',
      'vhost' => 'liusir'
      );
      // var_dump(extension_loaded('amqp'));

      $conn = new AMQPConnection($config);
      $conn->connect();
      $channel = new AMQPChannel($conn);
      $exchange = new AMQPExchange($channel);
      // $ref = new ReflectionClass($conn);
      // $ref = new ReflectionClass($channel);
      // $ref = new ReflectionClass($exchange);

      $exchange->setName($exchangeName);
      $exchange->setType('x-delayed-message');
      $exchange->setArgument('x-delayed-type', 'direct');
      $exchange->declareExchange();

      // $channel->startTransaction();
      $queue = new AMQPQueue($channel);
      // $ref = new ReflectionClass($queue);
      $queue->setName($routeKey);

      $queue->setFlags(AMQP_DURABLE);
      $queue->declareQueue();
      $queue->bind($exchangeName, $queueName);

      for ($i=1; $i<=5; $i++) {
      $message = json_encode(['order_id' => time(), 'i' => $i]);
      $ret = $exchange->publish($message, $queueName, AMQP_NOPARAM, ['headers' => ['x-delay' => 10000]]);
      // if (!$ret) {
      // $channel->rollbackTransaction();
      // }
      echo "消息 $i 发送成功",PHP_EOL;
      // sleep(2);
      }

      // $channel->commitTransaction();
      $conn->disconnect();
      • consumer.php
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      $exchangeName = 'pluginDelayedExchange';
      $queueName = 'pluginTtlQueue';
      $routeKey = 'pluginDelayQueue';

      $config = array(
      'host' => 'localhost',
      'port' => 5672,
      'login' => 'guest',
      'password' => 'guest',
      'vhost' => '/'
      );

      //var_dump(extension_loaded('amqp'));

      $conn = new AMQPConnection($config);
      $conn->connect();
      $channel = new AMQPChannel($conn);

      $exchange = new AMQPExchange($channel);
      $exchange->setName($exchangeName);
      $exchange->setType('x-delayed-message');
      $exchange->setArgument('x-delayed-type', 'direct');
      $exchange->declareExchange();

      //$channel->startTransaction();
      $queue = new AMQPQueue($channel);
      $queue->setName($routeKey);
      $queue->setFlags(AMQP_DURABLE);
      $queue->declareQueue();
      $queue->bind($exchangeName, $routeKey);
      //$channel->commitTransaction();

      function callback(AMQPEnvelope $message) {
      global $queue;
      if ($message) {
      $body = $message->getBody();
      echo '接收内容:'.$body . PHP_EOL;
      $queue->ack($message->getDeliveryTag());
      } else {
      echo 'no message' . PHP_EOL;
      }
      }

      $action = '1';
      if ($action == '1') {
      $queue->consume('callback');
      } else {
      $start = time();
      while(true) {
      $message = $queue->get();
      if(!empty($message)) {
      echo '接收内容:' . $message->getBody() . PHP_EOL;
      $queue->ack($message->getDeliveryTag());
      $end = time();
      echo '运行时间:' . ($end - $start) . '秒' . PHP_EOL;
      } else {
      // echo 'no message' . PHP_EOL;
      }
      }
      }

三、参考

  1. 参考一
  2. 参考二
  3. 参考三
  4. 参考四
  5. 参考五
  6. 参考六
  7. 参考七
  8. 参考八
  9. 参考九
  10. 参考十