关于故障点,如何分别确保生产者和消费者应用程序之间的kafka事务同步?

How to ensure kafka transaction synchronization between a producer and consumer application separately, regarding failure points?

总的来说,我对 Spring-Kafka/Kafka 还是有点陌生​​。我的问题很简短。我有一个 consumer only 应用程序,它不断从 Kafka 读取、处理消息,并使用 Ack Listener 手动确认它们。我有来自上游 producer-only 应用程序的依赖项,他们负责将消息发送到 Kafka 主题以便我使用。我们最近在生产者和消费者之间实现了交易,但我想更多地了解故障点以及如何处理那些回滚的交易,以免它们丢失?我读到最好使用 AfterRollbackProcessor 而不是 SeekToCurrentErrorHandler 进行 kafka 容器工厂的交易,同时 StatefulRetry 设置为 true .我使用事务的原因是为了在较新的版本中实现 exactly-once Kafka 语义,因为我们处理大量的数据库持久性并且由于数据库约束而无法承受重复的事务。我想知道我的 @KafkaListener 是否必须用 @Transactional 注释,因为我读过 post 之前说这不应该是这种情况,但其他 post 可能是这样是这样,这就是我不确定的原因。我已经看到很多关于生产者和消费者应用程序的问题,但我还没有看到关于分别具有这些不同角色的单独应用程序的问题(即使在一天结束时它可能是同一件事)。简而言之,我只是想知道将事务与 Kafka 合并时的最佳实践是什么,以及在这种情况下如何处理失败。

Kafka 事务对于仅限消费者的应用程序来说是不必要的开销。交易只有在产生记录时才有用。

I am using transactions is to achieve the exactly-once Kafka semantics in their newer release because we deal with a lot of database persistence and cannot afford duplicate transactions due to DB constraints.

当涉及其他技术时,没有保证"exactly once"。 Exactly once 仅适用于

read->process->write

读写为Kafka的场景。这是一个普遍的误解。

此外,即使仅使用 kafka read/process/write,"exactly once" 语义也仅适用于 whole shebang。即,只有在写入成功时才会提交读取的偏移量。

process 步骤将获得 至少一次 语义,因此无论何时在流程步骤的其他地方编写,都需要重复数据删除逻辑,无论是否有一个 Kafka 写步骤并且(如果有一个 Kafka 写)你在那里使用了一次事务。

对于从 Kafka 读取并写入数据库但不写入 Kafka 的情况,@Transactional 侦听器是正确的方法(使用重复数据删除逻辑来避免重复)。

对于您只需要一次 Kafka 语义(read/process/write)但又要在流程步骤中写入数据库的情况,您可以在侦听器容器中使用 ChainedKafkaTransactionManager,以便数据库事务是与 Kafka 事务同步(但对于 DB 提交成功但 Kafka 事务失败的情况,仍然存在一个小 window)。因此,即便如此,您仍然需要重复数据删除逻辑。在这种情况下,您 想要 @Transactional 监听器。

编辑

仅限生产者有点不同;假设你想在一个事务中发布 10 条记录,你希望它们全部进入(提交)或退出(回滚)。那么你必须使用事务。

事务中生成的记录的消费者应该有 isolation.level=read_committed,这样他们就不会看到未提交的写入(默认为 read_uncommitted)。

如果您一次只发布一条记录,并且不涉及其他事务资源,那么如果只涉及 Kafka,那么使用事务就没什么意义了。

但是,如果您正在从 DB 或 JMS 等读取数据并写入 Kafka,您可能希望同步 DB 和 Kafka 事务,但是重复的概率仍然不为零;您如何处理取决于您提交交易的顺序。

通常,重复数据删除依赖于应用程序;通常会使用应用程序数据中的某些键,例如,SQL INSERT 语句以数据库中尚不存在的键为条件。

Kafka为每条记录提供了一个方便的唯一键,结合topic/partition/offset。您可以将它们与数据一起存储在数据库中以防止重复。

EDIT2

SeekToCurrentErrorHandler (STCEH) 通常在不使用事务时使用;当侦听器抛出异常时,错误处理程序会重置偏移量,以便在下一次轮询时重新获取记录。经过一些尝试后,我们放弃并调用一个 "recoverer",例如 DeadLetterPublishingRecoverer 将失败的记录写入另一个主题。

但是它仍然可以用于交易。

错误处理程序在事务范围内调用( 回滚之前)因此,如果它抛出异常(它会抛出异常,除非恢复器"consumes"失败),事务仍将回滚。如果恢复成功,事务将提交。

在将恢复功能添加到 STCEH 之前添加了 AfterRollbackProcessor (ARP)。它本质上与 STCEH 完全相同,但它在事务范围 之外 运行(回滚 after)。

配置两者不会造成任何伤害,因为如果 STCEH 已经执行了查找,ARP 将无事可做。

我仍然更喜欢将 ARP 与事务一起使用,而 STCEH 则不使用 - 如果只是为了获得日志消息的适当日志类别。可能还有其他我暂时想不起来的原因。

请注意,现在STCEH和ARP都支持重试和退避,根本不需要配置侦听器级别的状态重试。如果您想使用内存中重试而不导致往返代理以重新获取相同记录,无状态重试可能仍然有用。