0%

RabbitMQ基础

能改的,叫做缺点,不能改的,叫做弱点。 —— 当年明月《明朝那些事儿》

一、基础

  1. RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现,支持多种客户端,如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

  2. AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

  3. Erlang是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CS-Lab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境。Erlang问世于1987年,经过十年的发展,于1998年发布开源版本。

Erlang是运行于虚拟机的解释性语言,但是现在也包含有乌普萨拉大学高性能Erlang计划(HiPE)开发的本地代码编译器,自R11B-4版本开始,Erlang也开始支持脚本式解释器。在编程范型上,Erlang属于多重范型编程语言,涵盖函数式、并发式及分布式。顺序执行的Erlang是一个及早求值,单次赋值和动态类型的函数式编程语言。Erlang是一个结构化,动态类型编程语言,内建并行计算支持。最初是由爱立信专门为通信应用设计的,比如控制交换机或者变换协议等,因此非常适合于构建分布式,实时软并行计算系统。使用Erlang编写出的应用运行时通常由成千上万个轻量级进程组成,并通过消息传递相互通讯。进程间上下文切换对于Erlang来说仅仅只是一两个环节,比起C程序的线程切换要高效得多得多了。

二、原理

  1. 结构图

RabbitMQ

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
  1. 发送消息

    • 生产者和Broker建立TCP连接
    • 生产者和Broker建立通道
    • 生产者通过通道消息发送给Broker,由Exchange将消息进行转发
    • Exchange将消息转发到指定的Queue(队列)
  2. 接收消息

    • 消费者和Broker建立TCP连接
    • 消费者和Broker建立通道
    • 消费者监听指定的Queue(队列)
    • 当有消息到达Queue时Broker默认将消息推送给消费者
    • 消费者接收到消息

三、一些概念

  1. 消息确认

    • 消息确认机制默认是关闭的,开启只需在消费者消耗消息方法basic_consume中将第四个参数设置为false,并在消息处理完毕后返回给生产者一个确认标识即可。
    • 列出队列中未处理和未确认的消息rabbitmqctl list_queues name messages_ready messages_unacknowledged
  2. 消息持久化

    • 将(生产者和消费者)声明队列方法queue_declare中将第三个参数设置为true
    • 生产者实例化消息时第二个可选参数设置为delivery_mode = 2
    1
    2
    3
    $msg = new AMQPMessage(
    $data, array('delivery_mode' => 2)
    );
  3. 合理分发消息

    • 消耗端设置$channel->basic_qos(null, 1, null),每个消耗端只处理一个消息
  4. 生产者

  5. 消费者

    • RabbitMQ消费者获取消息的方式
      • Push
      • Pull
  6. 消息队列

  7. 交换机

    • 分类
      • fanout:所有队列都可消费消息
      • direct:根据不同需求绑定不同的key到不同消息队列,生产者生产消息时通过传入不同的key区分消息类型
        • 当所有消息队列绑定的key相同时,和fanout等价
      • topic
        • * (star) 星号代替一个单词
        • # (hash) #号代替零个或多个单词
        • 不用* #时和direct等价
        • 只用#时和fanout等价
        • 将交换机声明为topic,生产者生产消息时传入不同的key,消费者通过星号和#号匹配对应的消息队列以消费
      • headers

四、实战

  1. 环境搭建

  2. 初始化项目

    • 创建一个rabbit目录,在此目录下创建composer.json文件,文件内容如下
    1
    2
    3
    4
    5
    {
    "require":{
    "php-amqplib/php-amqplib": ">=2.6.1"
    }
    }
    • 然后通过composer install 命令加载所需要的库
  3. 代码

    • 创建消息发布脚本 send.php
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    require_once __DIR__ . '/vendor/autoload.php';

    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $channel->queue_declare('hello', false, false, false, false);
    $msg = new AMQPMessage('Hello World!');
    $channel->basic_publish($msg, '', 'hello');

    echo " [x] Sent 'Hello World!'\n";
    $channel->close();
    $connection->close();
    • 创建消息消费脚本 recieve.php
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    require_once __DIR__ . '/vendor/autoload.php';

    use PhpAmqpLib\Connection\AMQPStreamConnection;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $channel->queue_declare('hello', false, false, false, false);
    echo " [*] Waiting for messages. To exit press CTRL+C\n";

    $callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    };
    $channel->basic_consume('hello', '', false, true, false, false, $callback);
    while (count($channel->callbacks)) {
    $channel->wait();
    }
    $channel->close();
    $connection->close();
  4. 运行(已经配置好php环境变量)

    • 在rabbit目录开两个命令窗口,分别执行 php recieve.php 和 php send.php 命令
  5. 结果

    • 执行 php recieve.php 后结果:

    recieve.php

    • 执行 php send.php 后结果:

    send.php

    recieve.php

五、参考

  1. 参考一
  2. 参考二
  3. 参考三
  4. 参考四