Kafka 消费者错误处理:混淆 ErrorHandling / RetryTemplate
Kafka Consumer Error Handling : confusion ErrorHandling / RetryTemplate
我在使用消息时尝试进行以下错误处理:
- 如果出现序列化错误:在 DLT 中发送消息
- 如果侦听器中的服务抛出异常,则根据异常重试 X 次,并在重试失败或选择异常时在 DLT 中发送消息。
我所拥有的(Spring kafka 2.5.5.RELEASE with 2.5.1 Kafka Client)如下:
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, MyObject> kafkaListenerContainerFactory() throws ExceptionTechnique {
ConcurrentKafkaListenerContainerFactory<String, MyObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(RECORD);
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(context -> {
Object record = context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
LOGGER_TECHNIQUE.error("Fail to handle message after {} retries. {}",
context.getRetryCount(), record);
return record;
});
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaOperations,
(cr, e) -> new TopicPartition("myTopicToRead.dlq", cr.partition())),
new FixedBackOff(10000L, 2L)));
factory.setConcurrency(kafkaListenerConcurrency);
factory.setStatefulRetry(true);
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(retryPolicy());
return retryTemplate;
}
private BackOffPolicy backOffPolicy() {
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(10000);
return fixedBackOffPolicy;
}
private SimpleRetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(TimeoutException.class, false);
exceptionMap.put(ListenerExecutionFailedException.class, true);
// Custom exception
exceptionMap.put(MyTechnicalException.class, false);
exceptionMap.put(MyFunctionalException.class, false);
return new SimpleRetryPolicy(3, exceptionMap, true);
}
现在,如果我发送不可序列化的消息,我的消息会在不重试的情况下进入 DLT -> OK!
在我的 @KafkaHandler
中,我有一个 throw new MyTechnicalException
,被捕获并重新抛出。
我应该没有重试,但我重试了 2 次,每次 20 秒(而不是 10 秒?),并且在 2 次重试后向 DLT 发送了一条消息。
如果我删除 errorHandler,毫不奇怪,尝试 3 次并显示我的日志错误消息...但我需要发送到 DLQ...
如果我删除 RetryTemplate 和 RecoveryCallback,不足为奇,但所有异常都会重试...
问题:
- 有没有办法处理我的用例?怎么样?
- 出厂配置中的RecoveryCallback和ErrorHandler有什么区别?
- 重试取决于异常类型如何工作?它适用于我们的异常吗?
侦听器级别的重试已被弃用,因为错误处理程序已经发展到涵盖 RetryTemplate
提供的所有功能 - 回退、异常分类等。
我在使用消息时尝试进行以下错误处理:
- 如果出现序列化错误:在 DLT 中发送消息
- 如果侦听器中的服务抛出异常,则根据异常重试 X 次,并在重试失败或选择异常时在 DLT 中发送消息。
我所拥有的(Spring kafka 2.5.5.RELEASE with 2.5.1 Kafka Client)如下:
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, MyObject> kafkaListenerContainerFactory() throws ExceptionTechnique {
ConcurrentKafkaListenerContainerFactory<String, MyObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(RECORD);
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(context -> {
Object record = context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
LOGGER_TECHNIQUE.error("Fail to handle message after {} retries. {}",
context.getRetryCount(), record);
return record;
});
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaOperations,
(cr, e) -> new TopicPartition("myTopicToRead.dlq", cr.partition())),
new FixedBackOff(10000L, 2L)));
factory.setConcurrency(kafkaListenerConcurrency);
factory.setStatefulRetry(true);
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(retryPolicy());
return retryTemplate;
}
private BackOffPolicy backOffPolicy() {
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(10000);
return fixedBackOffPolicy;
}
private SimpleRetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(TimeoutException.class, false);
exceptionMap.put(ListenerExecutionFailedException.class, true);
// Custom exception
exceptionMap.put(MyTechnicalException.class, false);
exceptionMap.put(MyFunctionalException.class, false);
return new SimpleRetryPolicy(3, exceptionMap, true);
}
现在,如果我发送不可序列化的消息,我的消息会在不重试的情况下进入 DLT -> OK!
在我的 @KafkaHandler
中,我有一个 throw new MyTechnicalException
,被捕获并重新抛出。
我应该没有重试,但我重试了 2 次,每次 20 秒(而不是 10 秒?),并且在 2 次重试后向 DLT 发送了一条消息。
如果我删除 errorHandler,毫不奇怪,尝试 3 次并显示我的日志错误消息...但我需要发送到 DLQ...
如果我删除 RetryTemplate 和 RecoveryCallback,不足为奇,但所有异常都会重试...
问题:
- 有没有办法处理我的用例?怎么样?
- 出厂配置中的RecoveryCallback和ErrorHandler有什么区别?
- 重试取决于异常类型如何工作?它适用于我们的异常吗?
侦听器级别的重试已被弃用,因为错误处理程序已经发展到涵盖 RetryTemplate
提供的所有功能 - 回退、异常分类等。