0%

Redis之实现消息队列的几种方式

消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。

一、概念

      队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。队列中没有元素时,称为空队列。它是一种先进先出的数据结构,即操作队列元素时有顺序性。

      消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ,以及Redis。

二、实现方式

  1. 基于List
    • 非阻塞命令LPUSH/RPOP
    • 阻塞命令BRPOP

此处的阻塞与非阻塞理解:基于List实现的消息队列,消费者端一般写个死循环(while(true))不断的从队列中拉取消息;当队列为空时,消费者端使用RPOP时仍然会不断拉取消息,进而造成CPU空跑浪费资源;BRPOP则会阻塞,当队列不为空时会重新拉取(在设置的超时时间范围内)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 客户端1
127.0.0.1:6379> lpush mq-list 1 2 3
(integer) 3
127.0.0.1:6379> type mq-list
list
127.0.0.1:6379> llen mq-list
(integer) 3

# 客户端2
127.0.0.1:6379> rpop mq-list
"1"
127.0.0.1:6379> rpop mq-list
"2"
127.0.0.1:6379> rpop mq-list
"3"
127.0.0.1:6379> rpop mq-list
(nil)


# 客户端1
127.0.0.1:6379> llen mq-list
(integer) 0

# 客户端2
127.0.0.1:6379> brpop mq-list 0

# 客户端1
127.0.0.1:6379> lpush mq-list 222
(integer) 1

# 客户端2
127.0.0.1:6379> brpop mq-list 0
1) "mq-list"
2) "222"
(17.27s)
  1. 基于Pub/Sub
1
2
3
4
5
6
7
8
9
10
11
12
13
# 客户端1
127.0.0.1:6379> subscribe mq-ps
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "mq-ps"
3) (integer) 1
1) "message"
2) "mq-ps"
3) "hello world"

# 客户端2
127.0.0.1:6379> publish mq-ps "hello world"
(integer) 1
  1. 基于Zset

    • 根据score值的有序性可实现顺序消费的消息队列
  2. 基于Stream(5.0+版本支持)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 客户端1
127.0.0.1:6379> xadd mq-stream * name liusir
"1618810421154-0"
127.0.0.1:6379> xadd mq-stream * name liuliu
"1618810463678-0"

# 客户端2
127.0.0.1:6379> xread count 5 STREAMS mq-stream 0-0
1) 1) "mq-stream"
2) 1) 1) "1618810421154-0"
2) 1) "name"
2) "liusir"
2) 1) "1618810463678-0"
2) 1) "name"
2) "liuliu"

三、参考

  1. 参考一
  2. 参考二