0%

RabbitMQ优先级队列

临渊羡鱼,不如退而结网,比喻空怀壮志,不如实实在在地付诸于行动。

一、基础

      队列一般都遵守着先进先出(first-in-first-out )的顺序,优先级队列有点不同,它是按照优先级的顺序出队的。优先级队列是0个或多个元素的集合,每个元素都有一个优先权或值。它分为最小优先队列和最大优先队列,一般默认最大优先队列,即权值越大优先级越高,具体到如实现如用堆数据结构实现,则分别对应小顶堆和大顶堆。它可以使用数组、链表、堆数据结构或二叉搜索树来实现。

      RabbitMQ在版本3.5.0中有优先级队列的实现,任何队列可以通过设置可选参数x-max-priority转换为优先级队列,这个参数应该是1到255之间的正整数。

二、实现

  1. composer.json
1
2
3
4
5
6
{
"require": {
"phpunit/phpunit": "^9.4",
"php-amqplib/php-amqplib": "^3.0"
}
}
  1. 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
try {
$routingKey = 'priorityRoute';
$exchange = 'priorityExchange';
$queue = 'priorityQueue';

$channel->exchange_declare($exchange, 'direct');

$args = new AMQPTable();
$args->set('x-max-priority', 100);
$channel->queue_declare($queue, false, true, false, false, false, $args);
$channel->queue_bind($queue, $exchange, $routingKey);

$message = new AMQPMessage('优先级1', ['delivery_mode'=>2, 'priority' => 1]);
$channel->basic_publish($message, $exchange, $routingKey, true);

$message = new AMQPMessage('优先级100', ['delivery_mode'=>2, 'priority' => 100]);
$channel->basic_publish($message, $exchange, $routingKey, true);

$message = new AMQPMessage('优先级20', ['delivery_mode'=>2, 'priority' => 20]);
$channel->basic_publish($message, $exchange, $routingKey, true);

$channel->wait_for_pending_acks(2);

} catch (\Exception $e){
echo "exception: " . $e->getMessage();
}
$channel->close();
$connection->close();
  1. 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
require 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$queueName = 'priorityQueue';
$conn = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/', true);
$channel = $conn->channel();

$args = new AMQPTable();
$args->set('x-max-priority', 100);
$channel->queue_declare($queueName, false, true, false, false, false, $args);
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
// $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
// $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
echo "Received: ", $msg->body, PHP_EOL;
// $msg->nack(true);
sleep(1);
};

$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
$channel->wait();
// sleep(1);
}
  1. 先运行生产者,再运行消费者

三、参考

  1. 参考一
  2. 参考二