0%

Redis之stream

Redis5.0新增了Stream类型,从字面上看是流类型,但其实从功能上看应该是Redis对消息队列的完善实现。

一、概念

      Redis5.0新增了Stream类型,从字面上看是流类型,但其实从功能上看应该是Redis对消息队列(MQ,Message Queue)的完善实现。用过Redis做消息队列的都了解,基于Reids的消息队列实现有很多种,如:基于List,基于Pub/Sub,基于Zset,基于Stream,其中基于Stream是一种比较完善的消息队列实现,完善是指可重复消费(List/Zset)以及消息是否丢失(Pub/Sub)。

  1. 底层结构(src/stream.h)
1
2
3
4
5
6
7
8
9
10
11
typedef struct streamID {
uint64_t ms; /* Unix time in milliseconds. */
uint64_t seq; /* Sequence number. */
} streamID;

typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;

二、使用

  1. 生产消息,命令格式:xadd key ID field value [field value ...]
    • key为消息队列的名字
    • ID为消息ID,可自定义,一般推荐是用*由Redis自动生成(格式为当前时间抽的毫秒-序号)
    • field value为消息内容,可设置多个
1
2
3
4
127.0.0.1:6379> xadd mq-stream * name liusir age 10 addr beijing
"1618887396417-0"
127.0.0.1:6379> xadd mq-stream * name liuliu age 10 addr beijing
"1618887444162-0"
  1. 获取消息长度,命令格式:xlen key
1
2
127.0.0.1:6379> xlen mq-stream
(integer) 2
  1. 获取消息列表,命令格式:xrange key start end [COUNT count]
    • start :开始值,特殊值-表示最小值
    • end :结束值,特殊值+表示最大值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
127.0.0.1:6379> xrange mq-stream - +
1) 1) "1618887396417-0"
2) 1) "name"
2) "liusir"
3) "age"
4) "10"
5) "addr"
6) "beijing"
2) 1) "1618887444162-0"
2) 1) "name"
2) "liuliu"
3) "age"
4) "10"
5) "addr"
6) "beijing"

127.0.0.1:6379> xrange mq-stream 1618887444162-0 1618890355158-0
1) 1) "1618887444162-0"
2) 1) "name"
2) "liuliu"
3) "age"
4) "10"
5) "addr"
6) "beijing"
2) 1) "1618890355158-0"
2) 1) "name"
2) "liusirdotme"
3) "age"
4) "20"
5) "weight"
6) "80kg"
  1. 读取消息,命令格式:xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] i,大写均为固定参数,小写为可变参数
    • [COUNT count],选填项,用于限定获取的消息数量,默认全部读取
    • [BLOCK milliseconds],选填项,用于设置XREAD为阻塞模式,默认为非阻塞模式
      • 非阻塞模式下,读取完毕(即使没有任何消息)立即返回,
      • 阻塞模式下,若读取不到内容,则阻塞等待,需要设置阻塞时长。
    • key为消息队列名字
    • i为消息ID,使用0表示从第一条开始读取
      • 特殊的ID:$,阻塞模式下表示读取最新的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
127.0.0.1:6379> xread STREAMS mq-stream 0
1) 1) "mq-stream"
2) 1) 1) "1618887396417-0"
2) 1) "name"
2) "liusir"
3) "age"
4) "10"
5) "addr"
6) "beijing"
2) 1) "1618887444162-0"
2) 1) "name"
2) "liuliu"
3) "age"
4) "10"
5) "addr"
6) "beijing"

127.0.0.1:6379> xread STREAMS mq-stream 1618887396417-0
1) 1) "mq-stream"
2) 1) 1) "1618887444162-0"
2) 1) "name"
2) "liuliu"
3) "age"
4) "10"
5) "addr"
6) "beijing"
  1. 管理消费者组,命令格式:xgroup [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
    • CREATE key groupname id-or-$为消息队列key创建消费组groupname
      • 特殊ID:0,表示从第一条消息开始消费。
    • SETID key groupname id-or-$为消息队列key设置消费组groupname
    • DESTROY key groupname销毁消费组groupname
    • DELCONSUMER key groupname consumername
1
2
127.0.0.1:6379> xgroup CREATE mq-stream mq-groupname 0
OK
  1. 读取消费组消息,命令格式:xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
    • 特殊ID:>,代表从未发送给任何其他消费者的消息,即新消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
127.0.0.1:6379> xreadgroup GROUP mq-groupname consumerA STREAMS mq-stream >
1) 1) "mq-stream"
2) 1) 1) "1618887444162-0"
2) 1) "name"
2) "liuliu"
3) "age"
4) "10"
5) "addr"
6) "beijing"
2) 1) "1618890355158-0"
2) 1) "name"
2) "liusirdotme"
3) "age"
4) "20"
5) "weight"
6) "80kg"

127.0.0.1:6379> xreadgroup GROUP mq-groupname consumerA STREAMS mq-stream 1618887444162-0
1) 1) "mq-stream"
2) 1) 1) "1618890355158-0"
2) 1) "name"
2) "liusirdotme"
3) "age"
4) "20"
5) "weight"
6) "80kg"
  1. 查看消息队列信息,命令格式:xinfo [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

  2. 消费者组读取xreadgroup后,并没有被实际消费,在pending列表放着,读取pending列表,命令格式:xpending key group [start end count] [consumer]

1
2
3
4
5
6
7
8
127.0.0.1:6379> xpending mq-stream group1
1) (integer) 5
2) "1618901317475-0"
3) "1618902545157-0"
4) 1) 1) "xiaofei1"
2) "4"
2) 1) "xiaofei2"
2) "1"
  1. 消费pending列表消息,命令格式:xack key group ID [ID ...]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
127.0.0.1:6379> xpending mq-stream group1
1) (integer) 5
2) "1618901317475-0"
3) "1618902545157-0"
4) 1) 1) "xiaofei1"
2) "4"
2) 1) "xiaofei2"
2) "1"

127.0.0.1:6379> xack mq-stream group1 1618901317475-0
(integer) 1
127.0.0.1:6379> xpending mq-stream group1
1) (integer) 4
2) "1618901426193-0"
3) "1618902545157-0"
4) 1) 1) "xiaofei1"
2) "3"
2) 1) "xiaofei2"
2) "1"

127.0.0.1:6379> xack mq-stream group1 1618901426193-0
(integer) 1
127.0.0.1:6379> xpending mq-stream group1
1) (integer) 3
2) "1618901471897-0"
3) "1618902545157-0"
4) 1) 1) "xiaofei1"
2) "2"
2) 1) "xiaofei2"
2) "1"
  1. 删除消息,命令格式:xdel key ID [ID ...]

  2. 转移消息,命令格式:xclaim key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]

三、参考

  1. 参考一
  2. 参考二