redis 流在消费者组内有序处理

redis streams ordered processing within consumer groups

我正在使用 python (aioredis) 和 redis 流。

我有一个生产者 - 许多(分组)消费者场景,我想确保消费者正在处理以有序方式发送到流的(批量)消息,这意味着:当第一条消息完成时,处理流中的下一条消息等等。这也意味着消费者组中的一个消费者正在处理,而其他消费者将等待。

我还想依赖第二个、第三个等等消费者组中的有序处理——所有这些都依赖于发送到一个流的相同消息。含义:

message 1 ... n -> stream1 
ordered processing within group 1 ... n  
whereas consumer 1 ... n per group 1 ... n

当我还想确保每个组的潜在订单检查逻辑不会过载时,完成此操作的好方法是什么?

让我回到老派同步处理,如果你想顺序处理流消息并不容易,原因是failure/retry。

考虑到你想处理每条消息最多一次,流消息执行是一个关键部分,消费者组成员为threads/processes。

要同步这个,你需要有某种锁定机制,给定的消费者组可能 运行 在不同的机器上。您可以使用全局锁定机制来防止多个消费者使用来自同一流的消息。

可以使用Redis锁(RedLock)来acquire/release锁。

伪代码

Procedure SequentialProcessor

Input: StreamName
Input: ConsumerName
Input: ConsumerGroup
Input: LockTime 


BEGIN
    redLock = RedLock()
    WHILE True DO
     IF redLock.aquireLock(StreamName#ConsumerGroup, LockTime) THEN
       message = redis.XREADGROUP( ConsumerGroup, StreamName, ...)
       TRY
         processMessage( message )
       FINALLY
          redLock.releaseLock( StreamName#ConsumerGroup )
     ENDIF
    END
END