如何测试我是否已在我的 spring 引导服务中正确配置了 ChainedKafkaTransactionManager

How can I test that I have configured ChainedKafkaTransactionManager correctly in my spring boot service

我的 spring 引导服务需要使用一个主题的 kafka 事件,进行一些处理(包括使用 JPA 写入数据库),然后在一个新主题上生成一些事件。无论发生什么情况,我都不会出现在不更新数据库的情况下发布事件的情况,如果出现任何问题,那么我希望消费者的下一次投票重试该事件。我的处理逻辑包括数据库更新是幂等的,所以重试没问题

认为 我已经通过使用 ChainedKafkaTransactionManager 实现了 https://docs.spring.io/spring-kafka/reference/html/#exactly-once 中描述的恰好一次语义:

@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
    kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return new ChainedKafkaTransactionManager(kafka, jpa); 
}

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        ChainedKafkaTransactionManager chainedTransactionManager) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setTransactionManager(chainedTransactionManager);

    return factory;
}

我的 application.yaml 文件中的相关 kafka 配置如下所示:

  kafka:
    ...
    consumer:
      group-id: myGroupId
      auto-offset-reset: earliest
      properties:
        isolation.level: read_committed
      ...
    producer:
      transaction-id-prefix: ${random.uuid}
      ...

因为提交顺序对我的应用程序至关重要,所以我想编写一个集成测试来证明提交以所需的顺序发生,并且如果在提交到 kafka 期间发生错误,那么原始事件将再次被消耗.但是,我正在努力寻找一种在 db 提交和 kafka 提交之间导致失败的好方法。

有什么建议或替代方法吗?

谢谢

您可以使用自定义 ProducerFactory 到 return MockProducer(由 kafka-clients 提供。

设置 commitTransactionException 以便在 KTM 尝试提交事务时抛出它。

编辑

这是一个例子;它不使用链式 TM,但这应该没什么区别。

@SpringBootApplication
public class So66018178Application {

    public static void main(String[] args) {
        SpringApplication.run(So66018178Application.class, args);
    }

    @KafkaListener(id = "so66018178", topics = "so66018178")
    public void listen(String in) {
        System.out.println(in);
    }

}
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.auto-offset-reset=earliest
@SpringBootTest(classes = { So66018178Application.class, So66018178ApplicationTests.Config.class })
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So66018178ApplicationTests {

    @Autowired
    EmbeddedKafkaBroker broker;

    @Test
    void kafkaCommitFails(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Config config)
            throws InterruptedException {

        registry.getListenerContainer("so66018178").stop();
        AtomicReference<Exception> listenerException = new AtomicReference<>();
        CountDownLatch latch = new CountDownLatch(1);
        ((ConcurrentMessageListenerContainer<String, String>) registry.getListenerContainer("so66018178"))
                .setAfterRollbackProcessor(new AfterRollbackProcessor<>() {

                    @Override
                    public void process(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer,
                            Exception exception, boolean recoverable) {

                        listenerException.set(exception);
                        latch.countDown();
                    }
                });
        registry.getListenerContainer("so66018178").start();

        Map<String, Object> props = KafkaTestUtils.producerProps(this.broker);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
        KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
        template.send("so66018178", "test");
        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(listenerException.get()).isInstanceOf(ListenerExecutionFailedException.class)
                .hasCause(config.exception);
    }

    @Configuration
    public static class Config {

        RuntimeException exception = new RuntimeException("test");

        @Bean
        public ProducerFactory<Object, Object> pf() {
            return new ProducerFactory<>() {

                @Override
                public Producer<Object, Object> createProducer() {
                    MockProducer<Object, Object> mockProducer = new MockProducer<>();
                    mockProducer.commitTransactionException = Config.this.exception;
                    return mockProducer;
                }

                @Override
                public Producer<Object, Object> createProducer(String txIdPrefix) {
                    Producer<Object, Object> producer = createProducer();
                    producer.initTransactions();
                    return producer;
                }

                @Override
                public boolean transactionCapable() {
                    return true;
                }

            };
        }

    }

}