如何测试我是否已在我的 spring 引导服务中正确配置了 ChainedKafkaTransactionManager
How can I test that I have configured ChainedKafkaTransactionManager correctly in my spring boot service
我的 spring 引导服务需要使用一个主题的 kafka 事件,进行一些处理(包括使用 JPA 写入数据库),然后在一个新主题上生成一些事件。无论发生什么情况,我都不会出现在不更新数据库的情况下发布事件的情况,如果出现任何问题,那么我希望消费者的下一次投票重试该事件。我的处理逻辑包括数据库更新是幂等的,所以重试没问题
我 认为 我已经通过使用 ChainedKafkaTransactionManager 实现了 https://docs.spring.io/spring-kafka/reference/html/#exactly-once 中描述的恰好一次语义:
@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager(kafka, jpa);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
ChainedKafkaTransactionManager chainedTransactionManager) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setTransactionManager(chainedTransactionManager);
return factory;
}
我的 application.yaml 文件中的相关 kafka 配置如下所示:
kafka:
...
consumer:
group-id: myGroupId
auto-offset-reset: earliest
properties:
isolation.level: read_committed
...
producer:
transaction-id-prefix: ${random.uuid}
...
因为提交顺序对我的应用程序至关重要,所以我想编写一个集成测试来证明提交以所需的顺序发生,并且如果在提交到 kafka 期间发生错误,那么原始事件将再次被消耗.但是,我正在努力寻找一种在 db 提交和 kafka 提交之间导致失败的好方法。
有什么建议或替代方法吗?
谢谢
您可以使用自定义 ProducerFactory
到 return MockProducer
(由 kafka-clients
提供。
设置 commitTransactionException
以便在 KTM 尝试提交事务时抛出它。
编辑
这是一个例子;它不使用链式 TM,但这应该没什么区别。
@SpringBootApplication
public class So66018178Application {
public static void main(String[] args) {
SpringApplication.run(So66018178Application.class, args);
}
@KafkaListener(id = "so66018178", topics = "so66018178")
public void listen(String in) {
System.out.println(in);
}
}
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.auto-offset-reset=earliest
@SpringBootTest(classes = { So66018178Application.class, So66018178ApplicationTests.Config.class })
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So66018178ApplicationTests {
@Autowired
EmbeddedKafkaBroker broker;
@Test
void kafkaCommitFails(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Config config)
throws InterruptedException {
registry.getListenerContainer("so66018178").stop();
AtomicReference<Exception> listenerException = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
((ConcurrentMessageListenerContainer<String, String>) registry.getListenerContainer("so66018178"))
.setAfterRollbackProcessor(new AfterRollbackProcessor<>() {
@Override
public void process(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer,
Exception exception, boolean recoverable) {
listenerException.set(exception);
latch.countDown();
}
});
registry.getListenerContainer("so66018178").start();
Map<String, Object> props = KafkaTestUtils.producerProps(this.broker);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.send("so66018178", "test");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listenerException.get()).isInstanceOf(ListenerExecutionFailedException.class)
.hasCause(config.exception);
}
@Configuration
public static class Config {
RuntimeException exception = new RuntimeException("test");
@Bean
public ProducerFactory<Object, Object> pf() {
return new ProducerFactory<>() {
@Override
public Producer<Object, Object> createProducer() {
MockProducer<Object, Object> mockProducer = new MockProducer<>();
mockProducer.commitTransactionException = Config.this.exception;
return mockProducer;
}
@Override
public Producer<Object, Object> createProducer(String txIdPrefix) {
Producer<Object, Object> producer = createProducer();
producer.initTransactions();
return producer;
}
@Override
public boolean transactionCapable() {
return true;
}
};
}
}
}
我的 spring 引导服务需要使用一个主题的 kafka 事件,进行一些处理(包括使用 JPA 写入数据库),然后在一个新主题上生成一些事件。无论发生什么情况,我都不会出现在不更新数据库的情况下发布事件的情况,如果出现任何问题,那么我希望消费者的下一次投票重试该事件。我的处理逻辑包括数据库更新是幂等的,所以重试没问题
我 认为 我已经通过使用 ChainedKafkaTransactionManager 实现了 https://docs.spring.io/spring-kafka/reference/html/#exactly-once 中描述的恰好一次语义:
@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager(kafka, jpa);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
ChainedKafkaTransactionManager chainedTransactionManager) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setTransactionManager(chainedTransactionManager);
return factory;
}
我的 application.yaml 文件中的相关 kafka 配置如下所示:
kafka:
...
consumer:
group-id: myGroupId
auto-offset-reset: earliest
properties:
isolation.level: read_committed
...
producer:
transaction-id-prefix: ${random.uuid}
...
因为提交顺序对我的应用程序至关重要,所以我想编写一个集成测试来证明提交以所需的顺序发生,并且如果在提交到 kafka 期间发生错误,那么原始事件将再次被消耗.但是,我正在努力寻找一种在 db 提交和 kafka 提交之间导致失败的好方法。
有什么建议或替代方法吗?
谢谢
您可以使用自定义 ProducerFactory
到 return MockProducer
(由 kafka-clients
提供。
设置 commitTransactionException
以便在 KTM 尝试提交事务时抛出它。
编辑
这是一个例子;它不使用链式 TM,但这应该没什么区别。
@SpringBootApplication
public class So66018178Application {
public static void main(String[] args) {
SpringApplication.run(So66018178Application.class, args);
}
@KafkaListener(id = "so66018178", topics = "so66018178")
public void listen(String in) {
System.out.println(in);
}
}
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.auto-offset-reset=earliest
@SpringBootTest(classes = { So66018178Application.class, So66018178ApplicationTests.Config.class })
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So66018178ApplicationTests {
@Autowired
EmbeddedKafkaBroker broker;
@Test
void kafkaCommitFails(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Config config)
throws InterruptedException {
registry.getListenerContainer("so66018178").stop();
AtomicReference<Exception> listenerException = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
((ConcurrentMessageListenerContainer<String, String>) registry.getListenerContainer("so66018178"))
.setAfterRollbackProcessor(new AfterRollbackProcessor<>() {
@Override
public void process(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer,
Exception exception, boolean recoverable) {
listenerException.set(exception);
latch.countDown();
}
});
registry.getListenerContainer("so66018178").start();
Map<String, Object> props = KafkaTestUtils.producerProps(this.broker);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.send("so66018178", "test");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listenerException.get()).isInstanceOf(ListenerExecutionFailedException.class)
.hasCause(config.exception);
}
@Configuration
public static class Config {
RuntimeException exception = new RuntimeException("test");
@Bean
public ProducerFactory<Object, Object> pf() {
return new ProducerFactory<>() {
@Override
public Producer<Object, Object> createProducer() {
MockProducer<Object, Object> mockProducer = new MockProducer<>();
mockProducer.commitTransactionException = Config.this.exception;
return mockProducer;
}
@Override
public Producer<Object, Object> createProducer(String txIdPrefix) {
Producer<Object, Object> producer = createProducer();
producer.initTransactions();
return producer;
}
@Override
public boolean transactionCapable() {
return true;
}
};
}
}
}