redis 流在消费者组内有序处理
redis streams ordered processing within consumer groups
我正在使用 python (aioredis) 和 redis 流。
我有一个生产者 - 许多(分组)消费者场景,我想确保消费者正在处理以有序方式发送到流的(批量)消息,这意味着:当第一条消息完成时,处理流中的下一条消息等等。这也意味着消费者组中的一个消费者正在处理,而其他消费者将等待。
我还想依赖第二个、第三个等等消费者组中的有序处理——所有这些都依赖于发送到一个流的相同消息。含义:
message 1 ... n -> stream1
ordered processing within group 1 ... n
whereas consumer 1 ... n per group 1 ... n
当我还想确保每个组的潜在订单检查逻辑不会过载时,完成此操作的好方法是什么?
让我回到老派同步处理,如果你想顺序处理流消息并不容易,原因是failure/retry。
考虑到你想处理每条消息最多一次,流消息执行是一个关键部分,消费者组成员为threads/processes。
要同步这个,你需要有某种锁定机制,给定的消费者组可能 运行 在不同的机器上。您可以使用全局锁定机制来防止多个消费者使用来自同一流的消息。
可以使用Redis锁(RedLock)来acquire/release锁。
伪代码
Procedure SequentialProcessor
Input: StreamName
Input: ConsumerName
Input: ConsumerGroup
Input: LockTime
BEGIN
redLock = RedLock()
WHILE True DO
IF redLock.aquireLock(StreamName#ConsumerGroup, LockTime) THEN
message = redis.XREADGROUP( ConsumerGroup, StreamName, ...)
TRY
processMessage( message )
FINALLY
redLock.releaseLock( StreamName#ConsumerGroup )
ENDIF
END
END
我正在使用 python (aioredis) 和 redis 流。
我有一个生产者 - 许多(分组)消费者场景,我想确保消费者正在处理以有序方式发送到流的(批量)消息,这意味着:当第一条消息完成时,处理流中的下一条消息等等。这也意味着消费者组中的一个消费者正在处理,而其他消费者将等待。
我还想依赖第二个、第三个等等消费者组中的有序处理——所有这些都依赖于发送到一个流的相同消息。含义:
message 1 ... n -> stream1
ordered processing within group 1 ... n
whereas consumer 1 ... n per group 1 ... n
当我还想确保每个组的潜在订单检查逻辑不会过载时,完成此操作的好方法是什么?
让我回到老派同步处理,如果你想顺序处理流消息并不容易,原因是failure/retry。
考虑到你想处理每条消息最多一次,流消息执行是一个关键部分,消费者组成员为threads/processes。
要同步这个,你需要有某种锁定机制,给定的消费者组可能 运行 在不同的机器上。您可以使用全局锁定机制来防止多个消费者使用来自同一流的消息。
可以使用Redis锁(RedLock)来acquire/release锁。
伪代码
Procedure SequentialProcessor
Input: StreamName
Input: ConsumerName
Input: ConsumerGroup
Input: LockTime
BEGIN
redLock = RedLock()
WHILE True DO
IF redLock.aquireLock(StreamName#ConsumerGroup, LockTime) THEN
message = redis.XREADGROUP( ConsumerGroup, StreamName, ...)
TRY
processMessage( message )
FINALLY
redLock.releaseLock( StreamName#ConsumerGroup )
ENDIF
END
END