伫倚危楼风细细,望极春愁,黯黯生天际。草色烟光残照里,无言谁会凭栏意。拟把疏狂图一醉,对酒当歌,强乐还无味。衣带渐宽终不悔,为伊消得人憔悴。 —— 北宋.柳永 《蝶恋花·伫倚危楼风细细》
连接异常
一、基础
断线重连:断线重连是指由于网络波动造成用户间歇性的断开与服务器的连接,待网络恢复之后服务器尝试将用户连接到上次断开时的状态和数据。具体到RabbitMQ,就是指连接断开后,尝试重新连接到同一连接进而继续进行生产和消费消息。
默认情况下,RabbitMQ只在Java和.NET实现了此机制,其他语言需要根据自己的场景自行实现。具体到PHP语言,composer包php-amqplib
中README.md有此描述:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
## Connection recovery ##
Some RabbitMQ clients using automated connection recovery mechanisms to reconnect
and recover channels and consumers in case of network errors.
Since this client is using a single-thread, you can set up connection recovery
using exception handling mechanism.
Exceptions which might be thrown in case of connection errors:
PhpAmqpLib\Exception\AMQPConnectionClosedException
PhpAmqpLib\Exception\AMQPIOException
\RuntimeException
\ErrorException
Some other exceptions might be thrown, but connection can still be there. It's
always a good idea to clean up an old connection when handling an exception
before reconnecting.
For example, if you want to set up a recovering connection:
$connection = null;
$channel = null;
while(true){
try {
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
// Your application code goes here.
do_something_with_connection($connection);
} catch(AMQPRuntimeException $e) {
echo $e->getMessage();
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
} catch(\RuntimeException $e) {
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
} catch(\ErrorException $e) {
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
}
}
A full example is in `demo/connection_recovery_consume.php`.
This code will reconnect and retry the application code every time the
exception occurs. Some exceptions can still be thrown and should not be handled
as a part of reconnection process, because they might be application errors.
This approach makes sense mostly for consumer applications, producers will
require some additional application code to avoid publishing the same message
multiple times.
This was a simplest example, in a real-life application you might want to
control retr count and maybe gracefully degrade wait time to reconnection.
You can find a more excessive example in [#444](https://github.com/php-amqplib/php-amqplib/issues/444)
二、参考
消息异常
消息丢失
- 生产者发送丢失
- 未送达exchange
- 未路由到queue
- 消息未持久化
- 消费者消费丢失
- 生产者发送丢失
消息积压:由于某些原因如消费端宕机、消费端消费能力不足、生产端发送流量过大等会产生消息积压的情况。
- 限制生产者生产速度
- 提升消费能力,如加机器、升配、消费策略等
- 将队列消息持久化到DB慢慢处理
消息乱序(时序性):消息队列存在的意义之一就是在高并发场景下,将所有请求加入队列。请求按排队顺序依次进行处理,第一步没有问题。接下来进入到业务层(如订单处理),生产者会产生多条异步处理消息,如修改订单状态:待支付->已支付->已取消,然后消息通过exchange会分发到不同的队列中(这种情况可以在代码里面进行控制,即把对消费顺序有要求的消息投放到同一个队列中),只要有多个消费者处理队列,都有可能会产生不同的消费顺序,从而导致业务上的漏洞。
- 一个Queue对应一下Consumer
- 消费者进行业务判断,前提是生产者有记录
消息重复(幂等性):由于各种原因(网络延迟、脚本异常终止),消费者出现异常或者消费者延迟消费,会造成进行MQ重试。在重试过程中,可能会造成消息重复消费,即出现非幂等性。保证消息消费时的幂等性,即保证消息不被重复消费。
- 基于全局MsgId或业务Id
- 基于redis缓存实现