事件中心——消费者横向扩展时如何防止重复处理

Event Hub -- how to prevent duplicate handling when consumers scale out

当我们有多个事件中心(或任何消息服务)的消费者时,如何确保没有消息被处理两次,尤其是在消费者自动扩展到多个实例的情况下?

我知道我们可以跟踪处理的最后一条消息,但话又说回来,在检查消息是否已处理和实际处理之间,其他实例已经可以处理它了(竞争条件?。

那么,如何以可扩展的方式解决这个问题?

[更新] 我知道有一个建议,即至少有与消费者一样多的分区,但是如果单个消费者无法处理定向到它的消息但需要扩展到多个实例,该怎么办?

每个处理器租用一个分区,请参阅the docs

An event processor instance typically owns and processes events from one or more partitions. Ownership of partitions is evenly distributed among all the active event processor instances associated with an event hub and consumer group combination.

因此向外扩展不会导致重复的消息处理,因为新处理器无法租用已由另一个处理器处理的分区。

那么,关于您的评论:

i am aware there is a recommendation to have at least as many partitions as there are consumers

反之亦然:建议有多少分区就有多少消费者。如果您的消费者多于分区,消费者将相互竞争以获得分区上的锁。

现在,关于重复消息,由于事件中心 guarantees at-least-once delivery,您无法采取太多措施来防止这种情况发生。提供至多一次交付的可扩展服务并不多,我知道如果您确实需要,Azure 服务总线队列确实提供了此服务。

可能会出现什么会导致重复邮件处理的问题。好吧,当处理消息时,处理器会做一些 checkpointing:偶尔它会将其位置存储在分区事件序列中(请记住,分区绑定到单个处理器)。现在,当处理器实例在两个检查点事件之间崩溃时,一个新实例将从最后一个检查点的位置恢复处理消息。这很可能会导致旧消息被再次处理。

If a reader disconnects from a partition, when it reconnects it begins reading at the checkpoint that was previously submitted by the last reader of that partition in that consumer group.

所以,这意味着您需要确保您的处理逻辑是幂等的。怎么样,这取决于你,因为我不知道你的用例。

一个选项是跟踪每条单独的消息以查看它是否已被处理。如果您没有要检查的唯一 ID,也许您可​​以生成整个消息的哈希值并与之进行比较。