kafka如何使用topic/partition/offset实现exactly-once消息传递逻辑

kafka how to implement exactly-once message delivery logic with topic/partition/offset

我设法在使用@KafkaListener 注释的方法中获得了 topic/partition/offset,但是我如何使用这些数据来实现 exactly-once 消费者逻辑?

我正在使用 ConcurrentKafkaListenerContainerFactory,设置为 concurrenc=4,并将 AckMode 设置为 MANUAL。 我目前的方式是使用redis去重: 我使用topic:partition作为redis的键,偏移量作为它的值,然后将即将到来的偏移量与redis中的值进行比较,如果偏移量比redis更新(更大),那么继续业务逻辑,否则我忽略信息。最后提交偏移量(ack.acknowledge())

但这种方式行不通,例如,如果重新平衡发生在 ack.acknowledge() 完成之前,则会出现此错误:org.apache.kafka.clients.consumer.CommitFailedException,

并且rebalancing后,原来的partition被分配给了另一个线程,导致同一条消息会被消费两次。

所以总而言之,如何设计一个逻辑,让每条kafka消息只传递一次?

Kafka暂不支持exactly once。它将在 0.11.0.0 版本中可用:https://issues.apache.org/jira/browse/KAFKA-4923 此版本计划于 2017 年 6 月 14 日发布,因此您可以等待或自己构建这个复杂的逻辑 ;-)

您必须在 Kafka 之外写出原子处理的最后一个偏移量以及处理结果。这可以是数据库或文件,只是不要进行两次写入,使其成为数据和偏移量的单个原子写入。如果您的消费者崩溃并且它或另一个实例重新启动或接管,您需要确保首先它读取与最后处理结果一起存储的最后一个偏移量,并在您 poll() 以获取更多消息之前 seek() 到该位置。这就是现有的 Kafka Sink Connector 有多少可以达到今天的 EOS 消费。