0%

RabbitMQ断线重连

夏后殷商西东周,春秋战国秦皇收,西汉东汉魏蜀吴,西晋东晋兼五胡。匈奴羯氏羌慕容,拓跋代北后称雄。宋齐梁陈是南朝,北魏齐周称北朝。北周灭齐传於隋,隋又灭陈再统一。隋灭唐兴称富强,五代十国各称王。契丹兴起在北方,建号为辽入汴梁。五代梁唐晋汉周,宋朝建国陈桥头。女真建金先灭辽,打破汴京北宋消。南宋偏安在江南,蒙古兴起国号元。灭金灭宋归一统,元朝统治九十年。明代共传十六君,满洲初起号后金。后金国号改为清,入关称帝都北京。人民觉悟革命起,清帝退位民国立。人民民主再胜利,齐心奔向共产国。

一、概念

  1. 断线重连是指由于网络波动造成用户间歇性的断开与服务器的连接,待网络恢复之后服务器尝试将用户连接到上次断开时的状态和数据。具体到RabbitMQ,就是指连接断开后,尝试重新连接到同一连接进而继续进行生产和消费消息。

二、实现

      默认情况下,RabbitMQ只在Java和.NET实现了此机制,其他语言需要根据自己的场景自行实现。具体到PHP语言,composer包php-amqplib中README.md有此描述:

1
2
3
4
5
6
7
8
9
## Connection recovery ##

Some RabbitMQ clients using automated connection recovery mechanisms to reconnect
and recover channels and consumers in case of network errors.

Since this client is using a single-thread, you can set up connection recovery
using exception handling mechanism.

Exceptions which might be thrown in case of connection errors:

PhpAmqpLib\Exception\AMQPConnectionClosedException
PhpAmqpLib\Exception\AMQPIOException
\RuntimeException
\ErrorException

1
2
3
4
5
6

Some other exceptions might be thrown, but connection can still be there. It's
always a good idea to clean up an old connection when handling an exception
before reconnecting.

For example, if you want to set up a recovering connection:

$connection = null;
$channel = null;
while(true){
try {
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
// Your application code goes here.
do_something_with_connection($connection);
} catch(AMQPRuntimeException $e) {
echo $e->getMessage();
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
} catch(\RuntimeException $e) {
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
} catch(\ErrorException $e) {
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

A full example is in `demo/connection_recovery_consume.php`.

This code will reconnect and retry the application code every time the
exception occurs. Some exceptions can still be thrown and should not be handled
as a part of reconnection process, because they might be application errors.

This approach makes sense mostly for consumer applications, producers will
require some additional application code to avoid publishing the same message
multiple times.

This was a simplest example, in a real-life application you might want to
control retr count and maybe gracefully degrade wait time to reconnection.

You can find a more excessive example in [#444](https://github.com/php-amqplib/php-amqplib/issues/444)

三、参考

  1. 官网
  2. 官网issue
  3. php-amqplib示例
  4. php-amqplib其他示例