智者不锐,慧者不傲,谋者不露,强者不暴。 —— 启功(也可能是启功引用《中国古代典籍》)
一、基础
持久化
- exchange持久化
- queue持久化
- message持久化
-
- 不可与事务共用
- 保证消息到达exchange,不保证消息可以路由到正确的queue
- 确认方式
- 单条确认
- 批量确认
- 异步确认
return机制:处理一些不可路由的消息,如exchange不存在或者路由key不存在
- 保证消息路由到正确的queue
- mandatory属性设为true
事务机制
- 不可与发布确认共用
- 方法
- channel.txSelect 是将信道设置成事务模式,
- channel.txCommit 用来提交事务
- channel.txRollback 用来回滚事务
- 原理
- TODO
消费确认
内存限制:当内存使用超过配置的阀值,RabbitMQ会暂停阻塞客户端的连接,并停止接收从客户端发来的消息,以此避免服务崩溃,客户端与服务端的心跳检测也会失效。
rabbitmqctl set_vm_memory_hight_watermark <num>
磁盘限制:当磁盘剩余空间低于确定的阀值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务崩溃。
rabbitmqctl set_disk_free_limit <num>
二、使用
持久化
- 使用PhpAmqpLib库
$channel->exchange_declare($exchange, 'direct', false, true);
$channel->queue_declare($queue, false, true, false, false, false);
$message = new AMQPMessage('hello world', ['delivery_mode'=>2]);
- 使用PhpAmqpLib库
发布确认
使用PhpAmqpLib库
- producer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// ./rabbitmqctl set_policy custom-policy "confirmQueue" '{"max-length":1,"overflow":"reject-publish"}' --apply-to queues
try {
$routingKey = 'confirmRoute';
$exchange = 'confirmExchange';
$queue = 'confirmQueue';
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/', true);
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'direct', false, true);
$channel->queue_declare($queue, false, true, false, false, false);
$channel->queue_bind($queue, $exchange, $routingKey);
// $errorRoutingKey = 'errorConfirmRoute';
// $channel->queue_bind($queue, $exchange, $errorRoutingKey);
// case1 output:
// fallback details as below:
// replay_code:312
// reply_text:NO_ROUTE
// case2 output:
// pub failure
$channel->confirm_select();
$channel->set_ack_handler(function(AMQPMessage $msg){
echo 'pub success' . PHP_EOL;
});
$channel->set_nack_handler(function(AMQPMessage $msg){
echo 'pub failure' . PHP_EOL;
});
$channel->set_return_listener(
function ($reply_code, $reply_text, $exchange, $routing_key, AMQPMessage $msg) use ($channel, $connection) {
echo 'fallback details as below:' . PHP_EOL;
echo "replay_code:" . $reply_code, PHP_EOL;
echo "reply_text:" . $reply_text, PHP_EOL;
$channel->close();
$connection->close();
exit();
});
$message = new AMQPMessage('hello world', ['delivery_mode'=>2]);
$channel->basic_publish($message, $exchange, $routingKey, true);
// $channel->basic_publish($message, $exchange, '', true);
// fallback details as below:
// replay_code:312
// reply_text:NO_ROUTE
$channel->wait_for_pending_acks_returns();
$channel->close();
$connection->close();
} catch (\Exception $e){
echo "exception: " . $e->getMessage();
}- comsumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34require 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$exchangeName = 'confirmRoute';
$queueName = 'confirmExchange';
$routingKey = 'confirmQueue';
$conn = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/', true);
$channel = $conn->channel();
// 声明队列再消费
$channel->queue_declare($routingKey, false, true, false, false, false);
// 不声明队列直接消费(前提是队列已存在)
// none code
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
// $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
// $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
echo "Received: ", $msg->body, PHP_EOL;
// $msg->nack(true);
sleep(1);
};
$channel->basic_consume($routingKey, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
// sleep(1);
}使用amqp扩展
- producer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55$config = ['host'=>'127.0.0.1', 'port'=>'5672', 'login'=>'guest', 'password'=>'guest', 'vhost'=>'/'];
$exchange_name = 'transaction_exchange_name';
$queue_name = 'transaction_queue_name';
$route_key = 'transaction_route_key';
$conn = new AMQPConnection($config);
$conn->connect();
$amqpChannel = new AMQPChannel($conn);
$amqpExchange = new AMQPExchange($amqpChannel);
$amqpExchange->setName($exchange_name);
$amqpExchange->setType(AMQP_EX_TYPE_DIRECT);
$amqpExchange->setFlags(AMQP_DURABLE);
$amqpExchange->declareExchange();
// 先运行消费脚本队列已存在则不用在此声明,未运行且队列不存在则应提前声明
// $queue = new AMQPQueue($amqpChannel);
// $queue->setName($queue_name);
// $queue->setFlags(AMQP_DURABLE);
// $queue->declareQueue();
// $queue->bind($exchange_name, $route_key);
try {
$amqpChannel->confirmSelect();
$message = json_encode(['user_id'=>1, 'time'=>time()]);
$ackCallback = function ($delivery_tag, $multiple) {
echo "message arrives at exchange",PHP_EOL;
};
$nackCallback = function ($delivery_tag, $multiple, $requeue) use ($message) {
echo "message cannot reach exchange",PHP_EOL;
// republish
};
$unreachableCallBack = function ($reply_code, $reply_text, $exchange, $routing_key, $properties, $body) {
echo 'message cannot reach queue',PHP_EOL;
// echo $reply_code,PHP_EOL;
// echo $reply_text,PHP_EOL;
};
// $amqpExchange->publish($message, $route_key, AMQP_DURABLE);
// 触发$unreachableCallBack要设置发布类型为AMQP_MANDATORY
// $amqpExchange->publish($message, '', AMQP_MANDATORY);//当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者;当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
// $amqpExchange->publish($message, $route_key, AMQP_IMMEDIATE);//当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue都没有消费者时,该消息会通过basic.return方法返还给生产者。
// RabbitMQ3.0+的版本里去掉了immediate参数的支持,运行时报错`Server connection error: 540, message: NOT_IMPLEMENTED - immediate=true`
// 消息能否到达exchange
$amqpChannel->setConfirmCallback($ackCallback, $nackCallback);
// 消息能否到达queue
$amqpChannel->setReturnCallback($unreachableCallBack);
$amqpChannel->waitForConfirm(2);
} catch (Exception $e) {
echo $e->getMessage();
}
$conn->disconnect();- consumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45$config = ['host'=>'127.0.0.1', 'port'=>'5672', 'login'=>'guest', 'password'=>'guest', 'vhost'=>'/'];
$exchange_name = 'transaction_exchange_name';
$queue_name = 'transaction_queue_name';
$route_key = 'transaction_route_key';
$conn = new AMQPConnection($config);
$conn->connect();
$amqpChannel = new AMQPChannel($conn);
// $amqpExchange = new AMQPExchange($amqpChannel);
// $amqpExchange->setName($exchange_name);
// $amqpExchange->setType(AMQP_EX_TYPE_DIRECT);
// $amqpExchange->setFlags(AMQP_DURABLE);
// $amqpExchange->declareExchange();
$amqpQueue = new AMQPQueue($amqpChannel);
$amqpQueue->setName($queue_name);
$amqpQueue->setFlags(AMQP_DURABLE);
$amqpQueue->declareQueue();
$amqpQueue->bind($exchange_name, $route_key);
$callbackManual = function (AMQPEnvelope $envelope, AMQPQueue $queue) {
echo $envelope->getBody(), PHP_EOL;
// 可通过注释、打开此行代码,查看web管理界面队列消息状态
// $queue->ack($envelope->getDeliveryTag());
$queue->nack($envelope->getDeliveryTag(), AMQP_NOWAIT);
};
$callbackAuto = function (AMQPEnvelope $envelope, AMQPQueue $queue) {
echo $envelope->getBody(), PHP_EOL;
};
// 阻塞模式接收消息
while (true) {
// 手动ACK应答确认
$amqpQueue->consume($callbackManual);
// 自动ACK应答确认
// $amqpQueue->consume($callbackAuto, AMQP_AUTOACK);
}
$conn->disconnect();
return机制
使用PhpAmqpLib库
- procuder.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
try {
$routingKey = 'confirmRoute';
$exchange = 'confirmExchange';
$queue = 'confirmQueue';
$channel->exchange_declare($exchange, 'direct');
$channel->queue_declare($queue, false, true, false, false, false);
$channel->queue_bind($queue, $exchange, $routingKey);
// confirm机制
$channel->confirm_select();
$channel->set_ack_handler(function (AMQPMessage $msg){
echo 'send success: ' . $msg->getBody(),PHP_EOL;
});
$channel->set_nack_handler(function (AMQPMessage $msg){
echo 'send fail: ' . $msg->getBody(),PHP_EOL;
});
// return机制
$channel->set_return_listener(function ($reply_code, $reply_text, $exchange, $routing_key, $properties){
echo 'message cannot reach queue',PHP_EOL;
echo $reply_code,PHP_EOL;
echo $reply_text,PHP_EOL;
});
$message = new AMQPMessage('hello world', ['delivery_mode'=>2]);
// $channel->basic_publish($message, $exchange, $routingKey, true);
// $channel->wait_for_pending_acks(2);
$errorRoute = 'errorRoute';
$channel->basic_publish($message, $exchange, $errorRoute, true, false);
while(1) {
$channel->wait();
}
} catch (\Exception $e){
echo "exception: " . $e->getMessage();
}
$channel->close();
$connection->close();
事务
使用PhpAmqpLib库
- procuder.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/', true);
$channel = $connection->channel();
try {
$routingKey = 'transactionRoute';
$exchange = 'transactionExchange';
$queue = 'transactionQueue';
$channel->exchange_declare($exchange, 'direct', false, true);
$channel->queue_declare($queue, false, true, false, false, false);
$channel->queue_bind($queue, $exchange, $routingKey);
$channel->tx_select();
$message = new AMQPMessage('hello world', ['delivery_mode'=>2]);
$channel->basic_publish($message, $exchange, $routingKey, true);
$channel->tx_commit();
echo "pub success", PHP_EOL;
$channel->close();
$connection->close();
} catch (\Exception $e){
$channel->tx_rollback();
$channel->close();
$connection->close();
echo "exception: " . $e->getMessage();
}使用amqp扩展
- producer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32$config = ['host'=>'127.0.0.1', 'port'=>'5672', 'login'=>'guest', 'password'=>'guest', 'vhost'=>'/'];
$exchange_name = 'transaction_exchange_name';
$queue_name = 'transaction_queue_name';
$route_key = 'transaction_route_key';
$conn = new AMQPConnection($config);
$conn->connect();
$amqpChannel = new AMQPChannel($conn);
$amqpExchange = new AMQPExchange($amqpChannel);
$amqpExchange->setName($exchange_name);
$amqpExchange->setType(AMQP_EX_TYPE_DIRECT);
$amqpExchange->setFlags(AMQP_DURABLE);
$amqpExchange->declareExchange();
// 先运行消费脚本队列已存在则不用在此声明,未运行且队列不存在则应声明
// $queue = new AMQPQueue($amqpChannel);
// $queue->setName($queue_name);
// $queue->setFlags(AMQP_DURABLE);
// $queue->declareQueue();
// $queue->bind($exchange_name, $route_key);
try {
$amqpChannel->startTransaction();
$message = json_encode(['user_id'=>1, 'time'=>time()]);
$amqpExchange->publish($message, $route_key, AMQP_DURABLE);
$amqpChannel->commitTransaction();
} catch (Exception $e) {
$amqpChannel->rollbackTransaction();
}
$conn->disconnect();
消费确认
使用amqp扩展
- producer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54$config = ['host'=>'127.0.0.1', 'port'=>'5672', 'login'=>'guest', 'password'=>'guest', 'vhost'=>'/'];
$exchange_name = 'transaction_exchange_name';
$queue_name = 'transaction_queue_name';
$route_key = 'transaction_route_key';
$conn = new AMQPConnection($config);
$conn->connect();
$amqpChannel = new AMQPChannel($conn);
$amqpExchange = new AMQPExchange($amqpChannel);
$amqpExchange->setName($exchange_name);
$amqpExchange->setType(AMQP_EX_TYPE_DIRECT);
$amqpExchange->setFlags(AMQP_DURABLE);
$amqpExchange->declareExchange();
// 先运行消费脚本队列已存在则不用在此声明,未运行且队列不存在则应提前声明
// $queue = new AMQPQueue($amqpChannel);
// $queue->setName($queue_name);
// $queue->setFlags(AMQP_DURABLE);
// $queue->declareQueue();
// $queue->bind($exchange_name, $route_key);
try {
$amqpChannel->confirmSelect();
$message = json_encode(['user_id'=>1, 'time'=>time()]);
$ackCallback = function ($delivery_tag, $multiple) {
echo "message arrives at exchange",PHP_EOL;
};
$nackCallback = function ($delivery_tag, $multiple, $requeue) use ($message) {
echo "message cannot reach exchange",PHP_EOL;
// republish
};
$unreachableCallBack = function ($reply_code, $reply_text, $exchange, $routing_key, $properties, $body) {
echo 'message cannot reach queue',PHP_EOL;
// echo $reply_code,PHP_EOL;
// echo $reply_text,PHP_EOL;
};
$amqpExchange->publish($message, $route_key, AMQP_DURABLE);
// 触发$unreachableCallBack要设置发布类型为AMQP_MANDATORY
// $amqpExchange->publish($message, '', AMQP_MANDATORY);//当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者;当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
// $amqpExchange->publish($message, $route_key, AMQP_IMMEDIATE);//当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue都没有消费者时,该消息会通过basic.return方法返还给生产者。
// RabbitMQ3.0+的版本里去掉了immediate参数的支持,运行时报错`Server connection error: 540, message: NOT_IMPLEMENTED - immediate=true`
// 消息能否到达exchange
$amqpChannel->setConfirmCallback($ackCallback, $nackCallback);
// 消息能否到达queue
$amqpChannel->setReturnCallback($unreachableCallBack);
$amqpChannel->waitForConfirm(2);
} catch (Exception $e) {
echo $e->getMessage();
}
$conn->disconnect();- consumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46$config = ['host'=>'127.0.0.1', 'port'=>'5672', 'login'=>'guest', 'password'=>'guest', 'vhost'=>'/'];
$exchange_name = 'transaction_exchange_name';
$queue_name = 'transaction_queue_name';
$route_key = 'transaction_route_key';
$conn = new AMQPConnection($config);
$conn->connect();
$amqpChannel = new AMQPChannel($conn);
// $amqpExchange = new AMQPExchange($amqpChannel);
// $amqpExchange->setName($exchange_name);
// $amqpExchange->setType(AMQP_EX_TYPE_DIRECT);
// $amqpExchange->setFlags(AMQP_DURABLE);
// $amqpExchange->declareExchange();
$amqpQueue = new AMQPQueue($amqpChannel);
$amqpQueue->setName($queue_name);
$amqpQueue->setFlags(AMQP_DURABLE);
$amqpQueue->declareQueue();
$amqpQueue->bind($exchange_name, $route_key);
$callbackManual = function (AMQPEnvelope $envelope, AMQPQueue $queue) {
echo $envelope->getBody(), PHP_EOL;
// 可通过注释、打开此行代码,查看web管理界面队列消息状态
// $queue->ack($envelope->getDeliveryTag());
// 可通过注释、打开此行代码,查看web管理界面队列消息状态
// $queue->nack($envelope->getDeliveryTag(), AMQP_NOWAIT);
};
$callbackAuto = function (AMQPEnvelope $envelope, AMQPQueue $queue) {
echo $envelope->getBody(), PHP_EOL;
};
// 阻塞模式接收消息
while (true) {
// 手动ACK应答确认
$amqpQueue->consume($callbackManual);
// 自动ACK应答确认
// $amqpQueue->consume($callbackAuto, AMQP_AUTOACK);
}
$conn->disconnect();使用PhpAmqpLib库
- producer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
try {
$routingKey = 'confirmRoute';
$exchange = 'confirmExchange';
$queue = 'confirmQueue';
$channel->exchange_declare($exchange, 'direct');
$channel->queue_declare($queue, false, true, false, false, false);
$channel->queue_bind($queue, $exchange, $routingKey);
// confirm机制
// $channel->confirm_select();
// $channel->set_ack_handler(function (AMQPMessage $msg){
// echo 'send success: ' . $msg->getBody(),PHP_EOL;
// });
// $channel->set_nack_handler(function (AMQPMessage $msg){
// echo 'send fail: ' . $msg->getBody(),PHP_EOL;
// });
// return机制
// $channel->set_return_listener(function ($reply_code, $reply_text, $exchange, $routing_key, $properties){
// echo 'message cannot reach queue',PHP_EOL;
// echo $reply_code,PHP_EOL;
// echo $reply_text,PHP_EOL;
// });
$message = new AMQPMessage('hello world', ['delivery_mode'=>2]);
$channel->basic_publish($message, $exchange, $routingKey, true);
// $channel->wait_for_pending_acks(2);
// $errorRoute = 'errorRoute';
// $channel->basic_publish($message, $exchange, $errorRoute, true, false);
// while(1) {
// $channel->wait();
// }
} catch (\Exception $e){
echo "exception: " . $e->getMessage();
}
$channel->close();
$connection->close();- consumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41require 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$exchangeName = 'confirmRoute';
$queueName = 'confirmExchange';
$routingKey = 'confirmQueue';
$conn = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/', true);
$channel = $conn->channel();
// 声明队列再消费
$channel->queue_declare($routingKey, false, true, false, false, false);
// 不声明队列直接消费(前提是队列已存在)
// none code
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
echo "Received: ", $msg->body, PHP_EOL;
// $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
// $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], true, true);
// $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
// $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['delivery_tag']);
// $msg->nack(true);
// $msg->ack(true);
$msg->reject(true);
sleep(1);
};
$channel->basic_consume($routingKey, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
// sleep(1);
}