能改的,叫做缺点,不能改的,叫做弱点。 —— 当年明月《明朝那些事儿》
一、基础
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现,支持多种客户端,如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
Erlang是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CS-Lab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境。Erlang问世于1987年,经过十年的发展,于1998年发布开源版本。
Erlang是运行于虚拟机的解释性语言,但是现在也包含有乌普萨拉大学高性能Erlang计划(HiPE)开发的本地代码编译器,自R11B-4版本开始,Erlang也开始支持脚本式解释器。在编程范型上,Erlang属于多重范型编程语言,涵盖函数式、并发式及分布式。顺序执行的Erlang是一个及早求值,单次赋值和动态类型的函数式编程语言。Erlang是一个结构化,动态类型编程语言,内建并行计算支持。最初是由爱立信专门为通信应用设计的,比如控制交换机或者变换协议等,因此非常适合于构建分布式,实时软并行计算系统。使用Erlang编写出的应用运行时通常由成千上万个轻量级进程组成,并通过消息传递相互通讯。进程间上下文切换对于Erlang来说仅仅只是一两个环节,比起C程序的线程切换要高效得多得多了。
二、原理
- 结构图
- Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
- Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
- Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
- Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
- Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
发送消息
- 生产者和Broker建立TCP连接
- 生产者和Broker建立通道
- 生产者通过通道消息发送给Broker,由Exchange将消息进行转发
- Exchange将消息转发到指定的Queue(队列)
接收消息
- 消费者和Broker建立TCP连接
- 消费者和Broker建立通道
- 消费者监听指定的Queue(队列)
- 当有消息到达Queue时Broker默认将消息推送给消费者
- 消费者接收到消息
三、一些概念
消息确认
- 消息确认机制默认是关闭的,开启只需在消费者消耗消息方法
basic_consume
中将第四个参数设置为false
,并在消息处理完毕后返回给生产者一个确认标识即可。 - 列出队列中未处理和未确认的消息
rabbitmqctl list_queues name messages_ready messages_unacknowledged
- 消息确认机制默认是关闭的,开启只需在消费者消耗消息方法
消息持久化
- 将(生产者和消费者)声明队列方法
queue_declare
中将第三个参数设置为true
- 生产者实例化消息时第二个可选参数设置为
delivery_mode = 2
1
2
3$msg = new AMQPMessage(
$data, array('delivery_mode' => 2)
);- 将(生产者和消费者)声明队列方法
合理分发消息
- 消耗端设置
$channel->basic_qos(null, 1, null)
,每个消耗端只处理一个消息
- 消耗端设置
生产者
消费者
- RabbitMQ消费者获取消息的方式
- Push
- Pull
- RabbitMQ消费者获取消息的方式
消息队列
交换机
- 分类
fanout
:所有队列都可消费消息direct
:根据不同需求绑定不同的key
到不同消息队列,生产者生产消息时通过传入不同的key
区分消息类型- 当所有消息队列绑定的
key
相同时,和fanout
等价
- 当所有消息队列绑定的
topic
* (star)
星号代替一个单词# (hash)
#号代替零个或多个单词- 不用
* #
时和direct
等价 - 只用
#
时和fanout
等价 - 将交换机声明为
topic
,生产者生产消息时传入不同的key
,消费者通过星号和#号匹配对应的消息队列以消费
headers
- 分类
四、实战
环境搭建
初始化项目
- 创建一个rabbit目录,在此目录下创建composer.json文件,文件内容如下
1
2
3
4
5{
"require":{
"php-amqplib/php-amqplib": ">=2.6.1"
}
}- 然后通过composer install 命令加载所需要的库
代码
- 创建消息发布脚本 send.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();- 创建消息消费脚本 recieve.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();运行(已经配置好php环境变量)
- 在rabbit目录开两个命令窗口,分别执行 php recieve.php 和 php send.php 命令
结果
- 执行 php recieve.php 后结果:
- 执行 php send.php 后结果: