Redis消息队列

Reids 实现消息队列存在几种方式:

  • Redis的列表(lists)数据结构

  • Redis自带的PUB/SUB机制,即发布-订阅模式

  • Stream流结构

本文主要介绍通过 Stream 流的方式

1. 生产&消费

使用stream进行消费有两种情况:

  • 只有一个消费者的独立消费

  • 多消费者的消费者组

1.1. 独立消费

1

类似于List,生产者往list中写数据,消费者从list中读数据,只能有一个消费者。

生产消息

  • redis

  • python

消费消息

  • 阻塞模式

  • 非阻塞模式

案例 python代码实现

生产者

消费者

消费后并不会删除队列中的消息。如果想连续消费,需要不断传入 ID 值

1.2. 多消费者

此时存在两个概念,需要区分开。

  • 消费组

    一个消费队列可以有多个消费组,消费组之间互不影响. 每个消费组(Consumer Group)的状态都是独立的,也就是说同一份Stream内部的消息会被每个消费组都消费到。

  • 消费者

    一个消费组中可以存在多个消费者,这些消费者将共同消费此时的队列。这些消费者之间是竞争关系

2

上图是消费者组的简单流程:

  1. 创建消费组

  2. 创建消费者 并使用 XREADGROUP GROUP ... > 进行消费。此时消费的消息进入 pending 队列中。

  3. 当向服务器发送 每条消息的ACK 时,此消息被认为成功消费,由 pending 删除。

创建消费组

创建消费者 & 消费

test_g1 是刚才创建的消费组,consumer_1 是新建的消费者 > 表示从 last_delivered_id 开始消费,每当消费者读取一条消息,last_delivered_id变量就会前进.

如果使用ID是任何其他有效的数字ID,则该命令将允许我们访问没有被ACK的消息(即处于pending中的消息)。

pendding队列

为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。

消费者首先根据 last_delivered_id 读取消息,然后将消息放到 pendding 队列中。当用户提交 ACK 时,表示该消息完全处理完,将pendding 中的消息删除。

XREADGROUP GROUP test_g1 consumer_1 count 1 streams "test" ID 使用ID时,将会从pendding中读取内容。

XACK

XACK将消息标记为已处理,其他的消费者将不会在处理该消息。同时将pending中的内容删除。但是此消息仍然保留在redis中。

案例

生产者

消费者

先遍历pending中内容,然后遍历流中的内容

consumer1 https://gist.github.com/f8fea383d367df1eb25bb359df2d17e5

探讨ack与last_delievered_id

  1. 创建一个组 xgroup create consumer g1 0-0

  2. 查看以下pending

  3. 消费2条消息 XREADGROUP GROUP g1 c1 count 1 STREAMS consumer >

  4. 再次查看pending

  5. 重置 last_delivered_id

  6. 查看pendign信息

  7. 重新消费

  8. 查看队列

  9. 查看pending中的内容

  10. ack num 0

  11. 查看pending

  12. 重置 last_delivered_id

  13. 消费消息

  14. 查看队列

2. 命令速查

2.1. XADD

生产消息

2.2. XREAD

非阻塞模式

阻塞模式

python

2.3. XINFO

查看当前流的信息

查看当流的组的信息

查看流中组的消费者信息

2.4. XRANGE

遍历消息

2.5. XGROUP

创建消费者组

删除组

删除消费者

设置 last_delivered_id

2.6. XREADGROUP

创建消费者并消费

2.7. XACK

确认消息

2.8. XDEL

删除消息

2.9. XLEN

查看流的长度

2.10. XTRIM

设置流的最大长度

依据先进先出的原则,自动删除超出最长长度的消息

Last updated