Kafka偏移量管理和与数据库同步

Kafka offset management and sync with DB

我正在开发一个使用 Kafka 流和数据库的应用程序。

在我的应用程序中,我手动管理 Kafka 偏移量并仅在成功处理消息的情况下提交偏移量(即在成功处理和更新到 DB 之后)。

但是,如果在更新数据库之后我的应用程序在提交之前宕机,那么当它恢复时会由于未提交的偏移量而导致重复写入数据库。

我想避免这些重复,同时仍确保我正在处理每一条消息。执行此操作的正确方法是什么?

编辑:我对数据库的更新基本上是将记录的计数器增加了某个值。所以 MERGE 语句不是一个选项。

这有点棘手。

Kafka 支持 exactly-once 语义。但是当你将数据写入外部数据存储时,你需要确保消费者端的恰好一次。

实现此目的的一种方法(如 Jay Kreps here 所提议)是,将 Kafka 偏移量作为单个事务的一部分维护在数据存储中。因此,如果您为每个分区维护最后一个偏移量,那么当您收到的偏移量小于存储在数据库中的偏移量时,您始终可以忽略来自给定分区的消息。

但是,这种方法有一个警告。如果你有一个多数据中心的主动-主动部署,如果主集群出现故障,消费者回退到不同的不同数据中心集群,你不能盲目地依赖偏移量。偏移量是一个物理 id,一个集群中消息的偏移量可以不同于另一个集群中复制消息的偏移量。

在这些情况下,我认为正确的方法是利用 Kafka 流并在 Kafka table (KTable) 中维护计数,该计数存储在压缩的 Kafka 主题中。 Kafka内部会使用producer id, epoch, transaction id等来保证exactly once语义