如何使用 Spring Data 和 Kafka 处理跨越多个线程的事务
How to handle transactions spanning multiple threads with Spring Data and Kafka
我正在开发一个应用程序,该应用程序使用 Kafka 来消费来自多个主题的消息,并在数据运行时保留数据。
为此,我使用@Service class,其中有几个方法用@kafkaListener 注释。考虑一下:
@Transactional
@KafkaListener(topics = MyFirstMessage.TOPIC, autoStartup = "false", containerFactory = "myFirstKafkaListenerContainerFactory")
public void handleMyFirstMessage(ConsumerRecord<String, MyFirstMessage> record, Acknowledgment acknowledgment) throws Exception {
MyFirstMessage message = consume(record, acknowledgment);
try {
doHandle(record.key(), message);
} catch (Exception e) {
TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
} finally {
acknowledgment.acknowledge();
}
}
@Transactional
@KafkaListener(topics = MySecondMessage.TOPIC, autoStartup = "false", containerFactory = "mySecondKafkaListenerContainerFactory")
public void handleMySecondMessage(ConsumerRecord<String, MySecondMessage> record, Acknowledgment acknowledgment) throws Exception {
MySecondMessage message = consume(record, acknowledgment);
try {
doHandle(record.key(), message);
} catch (Exception e) {
TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
} finally {
acknowledgment.acknowledge();
}
}
请无视setRollbackOnly的内容,与本题无关
相关的是每个侦听器中的 doHandle() 方法在 table 中执行插入操作,这偶尔会失败,因为一旦完成最终提交,自动生成的键就会变得不唯一。
发生的情况是每个 doHandle() 方法都会在它们自己的小事务中递增键列,并且只有其中一个会“赢得”那场比赛。另一个将在提交期间失败,并违反非唯一约束。
处理此问题的最佳做法是什么?我如何“同步”事务以像字符串上的珍珠一样执行而不是一次全部执行?
我正在考虑使用某种信号量或锁来序列化事物,但这听起来像是一个有很多陷阱的解决方案。如果有一个通用的模式或框架来帮助解决这个问题,我会更愿意table实施它。
对数据库使用 @Transactional
并在侦听器容器中使用 KafkaTransactionManager
类似于在容器中使用 ChainedKafkaTransactionManager
(配置了两个 TM)。当监听器正常退出时,DB tx 被提交,然后是 Kafka。
当侦听器抛出异常时,两个事务以相同的顺序回滚。
setRollbackOnly
与这个问题绝对相关,因为您在执行此操作时不会回滚 kafka 事务。
我正在开发一个应用程序,该应用程序使用 Kafka 来消费来自多个主题的消息,并在数据运行时保留数据。
为此,我使用@Service class,其中有几个方法用@kafkaListener 注释。考虑一下:
@Transactional
@KafkaListener(topics = MyFirstMessage.TOPIC, autoStartup = "false", containerFactory = "myFirstKafkaListenerContainerFactory")
public void handleMyFirstMessage(ConsumerRecord<String, MyFirstMessage> record, Acknowledgment acknowledgment) throws Exception {
MyFirstMessage message = consume(record, acknowledgment);
try {
doHandle(record.key(), message);
} catch (Exception e) {
TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
} finally {
acknowledgment.acknowledge();
}
}
@Transactional
@KafkaListener(topics = MySecondMessage.TOPIC, autoStartup = "false", containerFactory = "mySecondKafkaListenerContainerFactory")
public void handleMySecondMessage(ConsumerRecord<String, MySecondMessage> record, Acknowledgment acknowledgment) throws Exception {
MySecondMessage message = consume(record, acknowledgment);
try {
doHandle(record.key(), message);
} catch (Exception e) {
TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
} finally {
acknowledgment.acknowledge();
}
}
请无视setRollbackOnly的内容,与本题无关
相关的是每个侦听器中的 doHandle() 方法在 table 中执行插入操作,这偶尔会失败,因为一旦完成最终提交,自动生成的键就会变得不唯一。
发生的情况是每个 doHandle() 方法都会在它们自己的小事务中递增键列,并且只有其中一个会“赢得”那场比赛。另一个将在提交期间失败,并违反非唯一约束。
处理此问题的最佳做法是什么?我如何“同步”事务以像字符串上的珍珠一样执行而不是一次全部执行?
我正在考虑使用某种信号量或锁来序列化事物,但这听起来像是一个有很多陷阱的解决方案。如果有一个通用的模式或框架来帮助解决这个问题,我会更愿意table实施它。
对数据库使用 @Transactional
并在侦听器容器中使用 KafkaTransactionManager
类似于在容器中使用 ChainedKafkaTransactionManager
(配置了两个 TM)。当监听器正常退出时,DB tx 被提交,然后是 Kafka。
当侦听器抛出异常时,两个事务以相同的顺序回滚。
setRollbackOnly
与这个问题绝对相关,因为您在执行此操作时不会回滚 kafka 事务。