在事务中包装 StreamBridge 发送和 JPA 保存

Wrapping StreamBridge send and JPA save inside a transaction

我正在使用 Spring Boot 2.5.2 和 Spring Cloud 2020.0.3。我正在尝试包装一个休息服务调用,该调用使用 JPA (CrudRepository.save) 将记录保存到数据库,然后使用 StreamBridge post 使用 spring-cloud- 将消息发送到 Kafka 主题流(卡夫卡活页夹)。我尝试了几件事,但似乎没有什么是正确的。我故意造成 JPA 问题(插入一行会违反唯一键约束),但 Kafka 消息似乎仍会发送到代理。

  1. 我已经配置了一个 KafkaTransactionManager(没有使用 ChainedKafkaTransactionManager,因为它现在已被弃用)。但是,它似乎被忽略了,因为当配置中存在事务 ID 前缀时,StreamBridge 似乎会在内部创建自己的 tx mgr。
  2. 如果没有 transactional-id-prefix,ProducerFactory 根本就不是事务性的,这会导致 KafkaTransactionManager 实例化失败。
  3. 我试图完全避免创建自己的事务管理器,但这似乎也失败了,继续发送 kafka 消息。

配置此类流的正确方法是什么,以便对数据库和代理的写入都是原子的?

HTTP -> JPA 保存 -> Kafka 发送

您不需要事务管理器,但您确实需要 transactional.id 生产者工厂。

如果发送是在 JPA 事务的范围内执行的(例如 @Transactional 方法与 JPA TM),kafka 模板会将 Kafka 事务与现有事务同步并提交它,或者回滚它取决于主要交易。

你知道吗,即使是回滚的记录,实际上也是写入日志的?您必须将消费者 属性 isolation.level 设置为 read_committed 才能不接收回滚记录;它默认为 read_uncommitted.

编辑

有一个 bug 将仅生产者交易同步到现有交易;发送是在本地事务中执行的。

作为解决方法,您可以使用 TransactionTemplate 启动 Kafka 事务:

@SpringBootApplication
public class So68460690Application {

    public static void main(String[] args) {
        SpringApplication.run(So68460690Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(StreamBridge bridge, Foo foo, KafkaTransactionManager<byte[], byte[]> ktm) {
        return args -> {
            new TransactionTemplate(ktm).executeWithoutResult(
                    status -> foo.doInTx(bridge)); // or execute() to return a result
        };
    }

    @Bean
    KafkaTransactionManager<byte[], byte[]> binderTM(BinderFactory bf) {
        return new KafkaTransactionManager<>(((KafkaMessageChannelBinder) bf.getBinder("kafka", MessageChannel.class))
                .getTransactionalProducerFactory());
    }

}

@Component
class Foo {

    @Transactional
    public void doInTx(StreamBridge bridge) {
        bridge.send("ouutput", "test");
        throw new RuntimeException("testEx");
    }

}
spring.cloud.stream.bindings.output.destination=so68460690

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.
spring.cloud.stream.kafka.binder.configuration.acks=all


logging.level.org.springframework.kafka=trace
2021-07-27 17:31:37.923 DEBUG 55933 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] beginTransaction()
2021-07-27 17:31:37.924 DEBUG 55933 --- [           main] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0]]
2021-07-27 17:31:37.927 DEBUG 55933 --- [           main] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
2021-07-27 17:31:37.928 DEBUG 55933 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] abortTransaction()