在事务中包装 StreamBridge 发送和 JPA 保存
Wrapping StreamBridge send and JPA save inside a transaction
我正在使用 Spring Boot 2.5.2 和 Spring Cloud 2020.0.3。我正在尝试包装一个休息服务调用,该调用使用 JPA (CrudRepository.save) 将记录保存到数据库,然后使用 StreamBridge post 使用 spring-cloud- 将消息发送到 Kafka 主题流(卡夫卡活页夹)。我尝试了几件事,但似乎没有什么是正确的。我故意造成 JPA 问题(插入一行会违反唯一键约束),但 Kafka 消息似乎仍会发送到代理。
- 我已经配置了一个 KafkaTransactionManager(没有使用 ChainedKafkaTransactionManager,因为它现在已被弃用)。但是,它似乎被忽略了,因为当配置中存在事务 ID 前缀时,StreamBridge 似乎会在内部创建自己的 tx mgr。
- 如果没有 transactional-id-prefix,ProducerFactory 根本就不是事务性的,这会导致 KafkaTransactionManager 实例化失败。
- 我试图完全避免创建自己的事务管理器,但这似乎也失败了,继续发送 kafka 消息。
配置此类流的正确方法是什么,以便对数据库和代理的写入都是原子的?
HTTP -> JPA 保存 -> Kafka 发送
您不需要事务管理器,但您确实需要 transactional.id 生产者工厂。
如果发送是在 JPA 事务的范围内执行的(例如 @Transactional
方法与 JPA TM),kafka 模板会将 Kafka 事务与现有事务同步并提交它,或者回滚它取决于主要交易。
你知道吗,即使是回滚的记录,实际上也是写入日志的?您必须将消费者 属性 isolation.level
设置为 read_committed
才能不接收回滚记录;它默认为 read_uncommitted
.
编辑
有一个 bug 将仅生产者交易同步到现有交易;发送是在本地事务中执行的。
作为解决方法,您可以使用 TransactionTemplate
启动 Kafka 事务:
@SpringBootApplication
public class So68460690Application {
public static void main(String[] args) {
SpringApplication.run(So68460690Application.class, args);
}
@Bean
public ApplicationRunner runner(StreamBridge bridge, Foo foo, KafkaTransactionManager<byte[], byte[]> ktm) {
return args -> {
new TransactionTemplate(ktm).executeWithoutResult(
status -> foo.doInTx(bridge)); // or execute() to return a result
};
}
@Bean
KafkaTransactionManager<byte[], byte[]> binderTM(BinderFactory bf) {
return new KafkaTransactionManager<>(((KafkaMessageChannelBinder) bf.getBinder("kafka", MessageChannel.class))
.getTransactionalProducerFactory());
}
}
@Component
class Foo {
@Transactional
public void doInTx(StreamBridge bridge) {
bridge.send("ouutput", "test");
throw new RuntimeException("testEx");
}
}
spring.cloud.stream.bindings.output.destination=so68460690
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.
spring.cloud.stream.kafka.binder.configuration.acks=all
logging.level.org.springframework.kafka=trace
2021-07-27 17:31:37.923 DEBUG 55933 --- [ main] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] beginTransaction()
2021-07-27 17:31:37.924 DEBUG 55933 --- [ main] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0]]
2021-07-27 17:31:37.927 DEBUG 55933 --- [ main] o.s.k.t.KafkaTransactionManager : Initiating transaction rollback
2021-07-27 17:31:37.928 DEBUG 55933 --- [ main] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] abortTransaction()
我正在使用 Spring Boot 2.5.2 和 Spring Cloud 2020.0.3。我正在尝试包装一个休息服务调用,该调用使用 JPA (CrudRepository.save) 将记录保存到数据库,然后使用 StreamBridge post 使用 spring-cloud- 将消息发送到 Kafka 主题流(卡夫卡活页夹)。我尝试了几件事,但似乎没有什么是正确的。我故意造成 JPA 问题(插入一行会违反唯一键约束),但 Kafka 消息似乎仍会发送到代理。
- 我已经配置了一个 KafkaTransactionManager(没有使用 ChainedKafkaTransactionManager,因为它现在已被弃用)。但是,它似乎被忽略了,因为当配置中存在事务 ID 前缀时,StreamBridge 似乎会在内部创建自己的 tx mgr。
- 如果没有 transactional-id-prefix,ProducerFactory 根本就不是事务性的,这会导致 KafkaTransactionManager 实例化失败。
- 我试图完全避免创建自己的事务管理器,但这似乎也失败了,继续发送 kafka 消息。
配置此类流的正确方法是什么,以便对数据库和代理的写入都是原子的?
HTTP -> JPA 保存 -> Kafka 发送
您不需要事务管理器,但您确实需要 transactional.id 生产者工厂。
如果发送是在 JPA 事务的范围内执行的(例如 @Transactional
方法与 JPA TM),kafka 模板会将 Kafka 事务与现有事务同步并提交它,或者回滚它取决于主要交易。
你知道吗,即使是回滚的记录,实际上也是写入日志的?您必须将消费者 属性 isolation.level
设置为 read_committed
才能不接收回滚记录;它默认为 read_uncommitted
.
编辑
有一个 bug 将仅生产者交易同步到现有交易;发送是在本地事务中执行的。
作为解决方法,您可以使用 TransactionTemplate
启动 Kafka 事务:
@SpringBootApplication
public class So68460690Application {
public static void main(String[] args) {
SpringApplication.run(So68460690Application.class, args);
}
@Bean
public ApplicationRunner runner(StreamBridge bridge, Foo foo, KafkaTransactionManager<byte[], byte[]> ktm) {
return args -> {
new TransactionTemplate(ktm).executeWithoutResult(
status -> foo.doInTx(bridge)); // or execute() to return a result
};
}
@Bean
KafkaTransactionManager<byte[], byte[]> binderTM(BinderFactory bf) {
return new KafkaTransactionManager<>(((KafkaMessageChannelBinder) bf.getBinder("kafka", MessageChannel.class))
.getTransactionalProducerFactory());
}
}
@Component
class Foo {
@Transactional
public void doInTx(StreamBridge bridge) {
bridge.send("ouutput", "test");
throw new RuntimeException("testEx");
}
}
spring.cloud.stream.bindings.output.destination=so68460690
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.
spring.cloud.stream.kafka.binder.configuration.acks=all
logging.level.org.springframework.kafka=trace
2021-07-27 17:31:37.923 DEBUG 55933 --- [ main] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] beginTransaction()
2021-07-27 17:31:37.924 DEBUG 55933 --- [ main] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0]]
2021-07-27 17:31:37.927 DEBUG 55933 --- [ main] o.s.k.t.KafkaTransactionManager : Initiating transaction rollback
2021-07-27 17:31:37.928 DEBUG 55933 --- [ main] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] abortTransaction()