事务性 kafka 侦听器重试
Transactional kafka listener retry
我正在尝试创建一个 Spring Kafka @KafkaListener
,它既是事务性的(kafa 和数据库)又使用重试。我正在使用 Spring 启动。错误处理程序的文档说
When transactions are being used, no error handlers are configured, by default, so that the exception will roll back the transaction. Error handling for transactional containers are handled by the AfterRollbackProcessor. If you provide a custom error handler when using transactions, it must throw an exception if you want the transaction rolled back (source).
然而,当我用 @Transactional("kafkaTransactionManager
) 注释配置我的侦听器时,即使我可以清楚地看到模板在引发异常时回滚生成的消息,容器实际上使用了一个非空 commonErrorHandler
而不是 AfterRollbackProcessor
。即使我在容器工厂中将 commonErrorHandler
显式配置为 null 也是如此。我没有看到任何证据表明我配置的 AfterRollbackProcessor
曾经被调用过,即使在 commonErrorHandler
耗尽其重试策略之后也是如此。
我不确定 Spring Kafka 的错误处理在这一点上一般是如何工作的,我正在寻求澄清。我要回答的问题是:
- 使用 Spring-Kafka 2.8.0 配置事务性 kafka 侦听器的推荐方法是什么?我做对了吗?
- 确实应该使用公共错误处理程序而不是后回滚处理器吗?是否在根据重试策略再次尝试处理消息之前回滚当前事务?
- 一般来说,当我有一个事务性的 kafka 侦听器时,我是否应该注意不止一层的错误处理?例如。如果我的公共错误处理程序重新抛出类型 T 的异常,另一个处理程序会捕获它并可能开始自己的重试吗?
谢谢!
我的代码:
@Configuration
@EnableScheduling
@EnableKafka
public class KafkaConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfiguration.class);
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<Integer, Object>();
factory.setConsumerFactory(consumerFactory);
var afterRollbackProcessor =
new DefaultAfterRollbackProcessor<Object, Object>(
(record, e) -> LOGGER.info("After rollback processor triggered! {}", e.getMessage()),
new FixedBackOff(1_000, 1));
// Configures different error handling for different listeners.
factory.setContainerCustomizer(
container -> {
var groupId = container.getContainerProperties().getGroupId();
if (groupId.equals("InputProcessorHigh") || groupId.equals("InputProcessorLow")) {
container.setAfterRollbackProcessor(afterRollbackProcessor);
// If I set commonErrorHandler to null, it is defaulted instead.
}
});
return factory;
}
}
@Component
public class InputProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(InputProcessor.class);
private final KafkaTemplate<Integer, Object> template;
private final AuditLogRepository repository;
@Autowired
public InputProcessor(KafkaTemplate<Integer, Object> template, AuditLogRepository repository) {
this.template = template;
this.repository = repository;
}
@KafkaListener(id = "InputProcessorHigh", topics = "input-high", concurrency = "3")
@Transactional("kafkaTransactionManager")
public void inputHighProcessor(ConsumerRecord<Integer, Input> input) {
processInputs(input);
}
@KafkaListener(id = "InputProcessorLow", topics = "input-low", concurrency = "1")
@Transactional("kafkaTransactionManager")
public void inputLowProcessor(ConsumerRecord<Integer, Input> input) {
processInputs(input);
}
public void processInputs(ConsumerRecord<Integer, Input> input) {
var key = input.key();
var message = input.value().getMessage();
var output = new Output().setMessage(message);
LOGGER.info("Processing {}", message);
template.send("output-left", key, output);
repository.createIfNotExists(message); // idempotent insert
template.send("output-right", key, output);
if (message.contains("ERROR")) {
throw new RuntimeException("Simulated processing error!");
}
}
}
我的application.yaml(减去我的bootstrap-服务器和安全配置):
spring:
kafka:
consumer:
auto-offset-reset: 'earliest'
key-deserializer: 'org.apache.kafka.common.serialization.IntegerDeserializer'
value-deserializer: 'org.springframework.kafka.support.serializer.JsonDeserializer'
isolation-level: 'read_committed'
properties:
spring.json.trusted.packages: 'java.util,java.lang,com.github.tomboyo.silverbroccoli.*'
producer:
transaction-id-prefix: 'tx-'
key-serializer: 'org.apache.kafka.common.serialization.IntegerSerializer'
value-serializer: 'org.springframework.kafka.support.serializer.JsonSerializer'
[编辑](解决方案代码)
在 Gary 的帮助下,我才弄明白了。正如他们所说,我们需要在容器上设置kafka事务管理器,这样容器才能启动事务。交易文档没有介绍如何执行此操作,但有几种方法。首先,我们可以从工厂获取可变容器属性对象,并在 that:
上设置事务管理器
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
var factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setTransactionManager(...);
return factory;
}
如果我们在 Spring 启动,我们可以重新使用一些自动配置来在我们自定义之前在我们的工厂设置合理的默认值。我们可以看到 KafkaAutoConfiguration
模块导入了 KafkaAnnotationDrivenConfiguration
,它产生了一个 ConcurrentKafkaListenerContainerFactoryConfigurer
bean。这似乎是 Spring-Boot 应用程序中所有默认配置的原因。因此,我们可以注入那个 bean 并在添加定制之前用它来初始化我们的工厂:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer bootConfigurer,
ConsumerFactory<Object, Object> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
// Apply default spring-boot configuration.
bootConfigurer.configure(factory, consumerFactory);
factory.setContainerCustomizer(
container -> {
... // do whatever
});
return factory;
}
完成后,容器将按预期使用 AfterRollbackProcessor 进行错误处理。只要我不显式配置通用错误处理程序,这似乎是唯一的异常处理层。
AfterRollbackProcessor
仅在容器知道事务时使用;您必须向容器提供 KafkaTransactionManager
以便容器启动 kafka 事务,并将偏移量发送到事务。使用 @Transactional
不是启动 Kafka 事务的正确方法。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions
我正在尝试创建一个 Spring Kafka @KafkaListener
,它既是事务性的(kafa 和数据库)又使用重试。我正在使用 Spring 启动。错误处理程序的文档说
When transactions are being used, no error handlers are configured, by default, so that the exception will roll back the transaction. Error handling for transactional containers are handled by the AfterRollbackProcessor. If you provide a custom error handler when using transactions, it must throw an exception if you want the transaction rolled back (source).
然而,当我用 @Transactional("kafkaTransactionManager
) 注释配置我的侦听器时,即使我可以清楚地看到模板在引发异常时回滚生成的消息,容器实际上使用了一个非空 commonErrorHandler
而不是 AfterRollbackProcessor
。即使我在容器工厂中将 commonErrorHandler
显式配置为 null 也是如此。我没有看到任何证据表明我配置的 AfterRollbackProcessor
曾经被调用过,即使在 commonErrorHandler
耗尽其重试策略之后也是如此。
我不确定 Spring Kafka 的错误处理在这一点上一般是如何工作的,我正在寻求澄清。我要回答的问题是:
- 使用 Spring-Kafka 2.8.0 配置事务性 kafka 侦听器的推荐方法是什么?我做对了吗?
- 确实应该使用公共错误处理程序而不是后回滚处理器吗?是否在根据重试策略再次尝试处理消息之前回滚当前事务?
- 一般来说,当我有一个事务性的 kafka 侦听器时,我是否应该注意不止一层的错误处理?例如。如果我的公共错误处理程序重新抛出类型 T 的异常,另一个处理程序会捕获它并可能开始自己的重试吗?
谢谢!
我的代码:
@Configuration
@EnableScheduling
@EnableKafka
public class KafkaConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfiguration.class);
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<Integer, Object>();
factory.setConsumerFactory(consumerFactory);
var afterRollbackProcessor =
new DefaultAfterRollbackProcessor<Object, Object>(
(record, e) -> LOGGER.info("After rollback processor triggered! {}", e.getMessage()),
new FixedBackOff(1_000, 1));
// Configures different error handling for different listeners.
factory.setContainerCustomizer(
container -> {
var groupId = container.getContainerProperties().getGroupId();
if (groupId.equals("InputProcessorHigh") || groupId.equals("InputProcessorLow")) {
container.setAfterRollbackProcessor(afterRollbackProcessor);
// If I set commonErrorHandler to null, it is defaulted instead.
}
});
return factory;
}
}
@Component
public class InputProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(InputProcessor.class);
private final KafkaTemplate<Integer, Object> template;
private final AuditLogRepository repository;
@Autowired
public InputProcessor(KafkaTemplate<Integer, Object> template, AuditLogRepository repository) {
this.template = template;
this.repository = repository;
}
@KafkaListener(id = "InputProcessorHigh", topics = "input-high", concurrency = "3")
@Transactional("kafkaTransactionManager")
public void inputHighProcessor(ConsumerRecord<Integer, Input> input) {
processInputs(input);
}
@KafkaListener(id = "InputProcessorLow", topics = "input-low", concurrency = "1")
@Transactional("kafkaTransactionManager")
public void inputLowProcessor(ConsumerRecord<Integer, Input> input) {
processInputs(input);
}
public void processInputs(ConsumerRecord<Integer, Input> input) {
var key = input.key();
var message = input.value().getMessage();
var output = new Output().setMessage(message);
LOGGER.info("Processing {}", message);
template.send("output-left", key, output);
repository.createIfNotExists(message); // idempotent insert
template.send("output-right", key, output);
if (message.contains("ERROR")) {
throw new RuntimeException("Simulated processing error!");
}
}
}
我的application.yaml(减去我的bootstrap-服务器和安全配置):
spring:
kafka:
consumer:
auto-offset-reset: 'earliest'
key-deserializer: 'org.apache.kafka.common.serialization.IntegerDeserializer'
value-deserializer: 'org.springframework.kafka.support.serializer.JsonDeserializer'
isolation-level: 'read_committed'
properties:
spring.json.trusted.packages: 'java.util,java.lang,com.github.tomboyo.silverbroccoli.*'
producer:
transaction-id-prefix: 'tx-'
key-serializer: 'org.apache.kafka.common.serialization.IntegerSerializer'
value-serializer: 'org.springframework.kafka.support.serializer.JsonSerializer'
[编辑](解决方案代码)
在 Gary 的帮助下,我才弄明白了。正如他们所说,我们需要在容器上设置kafka事务管理器,这样容器才能启动事务。交易文档没有介绍如何执行此操作,但有几种方法。首先,我们可以从工厂获取可变容器属性对象,并在 that:
上设置事务管理器@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
var factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setTransactionManager(...);
return factory;
}
如果我们在 Spring 启动,我们可以重新使用一些自动配置来在我们自定义之前在我们的工厂设置合理的默认值。我们可以看到 KafkaAutoConfiguration
模块导入了 KafkaAnnotationDrivenConfiguration
,它产生了一个 ConcurrentKafkaListenerContainerFactoryConfigurer
bean。这似乎是 Spring-Boot 应用程序中所有默认配置的原因。因此,我们可以注入那个 bean 并在添加定制之前用它来初始化我们的工厂:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer bootConfigurer,
ConsumerFactory<Object, Object> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
// Apply default spring-boot configuration.
bootConfigurer.configure(factory, consumerFactory);
factory.setContainerCustomizer(
container -> {
... // do whatever
});
return factory;
}
完成后,容器将按预期使用 AfterRollbackProcessor 进行错误处理。只要我不显式配置通用错误处理程序,这似乎是唯一的异常处理层。
AfterRollbackProcessor
仅在容器知道事务时使用;您必须向容器提供 KafkaTransactionManager
以便容器启动 kafka 事务,并将偏移量发送到事务。使用 @Transactional
不是启动 Kafka 事务的正确方法。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions