Spring Kafka 中的事务同步

Transaction Synchronization in Spring Kafka

我想将 kafka 事务与存储库事务同步:

@Transactional
public void syncTransaction(){
  myRepository.save(someObject)
  kafkaTemplate.send(someEvent)
}

自合并 (https://github.com/spring-projects/spring-kafka/issues/373) 以来,根据文档,这是可能的。尽管如此,我在理解和实现该功能时遇到了问题。 查看 https://docs.spring.io/spring-kafka/reference/html/#transaction-synchronization 中的示例,我必须创建一个 MessageListenerContainer 来监听我自己的事件。 我还需要使用 KafkaTemplate 发送我的事件吗? MessageListenerContainer 是否禁止发送给代理?

如果我理解正确的话,kafkaTemplate 和 kafkaTransactionManager 必须使用同一个 producerFactory,我必须在其中启用事务设置 transactionIdPrefix。在我的示例中,我必须将 messageListenerContainer 的 TransactionManager 设置为 DataSourceTransactionManager。对吗?

从我的角度来看,我通过 kafkaTemplate 发送事件,监听我自己的事件并再次使用 kafkaTemplate 转发事件看起来很奇怪。

如果我能得到一个 kafka 事务与存储库事务的简单同步示例和解释,我真的会帮助我。

如果侦听器容器配置了 KafkaTransactionManager,容器将创建一个生产者,任何下游 kafka 模板都将使用该生产者,并且容器将为您将偏移量发送到事务。

如果容器有其他事务管理器,则容器无法发送偏移量,因为它无权访问生产者(或模板)。

另一种解决方案是使用 @Transactional(使用数据源 TM)注释您的方法,并使用 kafka TM 配置容器。

这样,您的数据库 tx 将在线程 returns 之前提交到容器,然后容器将偏移量发送到 kafka 事务并提交它。

有关示例,请参阅 the framework test cases

@Eike Behrends 要有一个 db + kafka 事务,你可以使用 ChainedTransactionManager 并这样定义它:

@Bean
public KafkaTransactionManager kafkaTransactionManager() {
    KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());;
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
}


@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
    return new JpaTransactionManager(em);
}

@Bean(name = "chainedTransactionManager")
public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager jpaTransactionManager,
                                                           KafkaTransactionManager kafkaTransactionManager) {
    return new ChainedTransactionManager(kafkaTransactionManager, jpaTransactionManager);
}

您需要注释您的事务性 db+kafka 方法@Transactional("chainedTransactionManager")

(您可以在 spring-kafka 项目上看到问题:https://github.com/spring-projects/spring-kafka/issues/433

你说:

From my perspective it looks weird that I send an event via kafkaTemplate, listen to my own event and forward the event using the kafkaTemplate again.

你试过吗?如果是这样,你能举个例子吗?

为了实现您的目标,您应该使用不同的“最终一致”方法,例如 CDC(更改数据捕获)。 Kafka 写入和任何其他系统(例如数据库)之间没有原子事务 - 也称为 XA 事务。这是一个完整的范例 swift 当你有分布式服务(有些人称之为微服务)时,在你的情况下可能通过生产/消费到/从 Kafka 主题进行通信。

TL;DR: 只需使用更新插入/合并。

无意中看到这个老话题,这么多年了,人家还在纠结

只想分享最简单最原生的处理kafka等系统的方法

人们来这里寻求答案的真正问题是分布式事务的旧方法。大多数人希望将非事务性(kafka 将其命名为事务功能,但实际上它们是“特殊的”)kafka 与某些 ACID 数据库同步。

如果您的服务在 idempotent 环境中工作 - 下游的所有内容也应该是 idempotent

只需确保您对底层存储的操作是幂等,最简单的方法是更新插入/合并(取决于存储)。

P.s。 CDC是一个东西,但它需要更多的人工成本,在大多数典型情况下是不必要的。

更多: 如果你想深入了解为什么 kafka “交易”很特别,这里有很好的起点(在 eos 中解释):