何时使用 RecoveryCallback 与 KafkaListenerErrorHandler
when to use RecoveryCallback vs KafkaListenerErrorHandler
我想知道什么时候应该使用 org.springframework.retry.RecoveryCallback 和 org.springframework.kafka.listener.KafkaListenerErrorHandler?
截至今天,我正在使用 class(实现 org.springframework.retry.RecoveryCallback)记录错误消息并将消息发送到 DLT,它正在运行。为了向 DLT 发送消息,我使用了 Spring KafkaTemplate,然后遇到了 KafkaListenerErrorHandler 和 DeadLetterPublishingRecoverer。现在,你能给我建议吗,我应该如何使用 KafkaListenerErrorHandler 和 DeadLetterPublishingRecoverer?这可以替代 RecoveryCallback 吗?
这是我当前的 kafkaListenerContainerFactory 代码
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(recoveryCallback);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setConcurrency(1);
factory.getContainerProperties().setMissingTopicsFatal(false);
return factory; }
如果它现在可以正常工作,为什么要更改它?
有好几层,您可以根据需要选择哪一层进行错误处理。
KafkaListenerErrorHandler
将在重试期间为每次交付尝试调用,因此您通常不会将其与重试一起使用。
- 重试
RecoveryCallback
在重试次数用尽后调用(或者如果您已将异常分类为不可重试则立即调用)。
ErrorHandler
- 在容器中并且在任何侦听器抛出异常时被调用,而不仅仅是 @KafkaListener
s.
使用最新版本的框架,您可以使用配置有 DeadLetterPublishingRecoverer
和 BackOff
.
的 SeekToCurrentErrorHandler
完全替换侦听器级别的重试
DeadLetterPublishingRecoverer
旨在用于容器错误处理程序,因为它需要原始 ConsumerRecord<?, ?>
.
KafkaListenerErrorHandler
只能访问从 ConsumerRecord<?, ?>
.
转换而来的 spring-消息传递 Message<?>
要添加来自 @GaryRussell 的优秀上下文,这就是我目前正在使用的内容:
我正在像这样处理任何错误(a.k.a 异常):
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0L, 0L)));
为了打印这个错误,我在 .DLT
上有一个监听器,我正在打印存储在 header 中的异常堆栈跟踪,如下所示:
@KafkaListener(id = "MY_ID", topics = MY_TOPIC + ".DLT")
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {
logger.error(exceptionStackTrace);
}
注意:
我正在使用 logger.error,因为我将所有错误消息重定向到正在监视的错误日志文件。
奖金:
如果您设置以下内容:
logging.level.org.springframework.kafka=DEBUG
您将在 console/log 中看到:
xxx [org.springframework.kafka.KafkaListenerEndpointContainer#7-2-C-1] DEBUG o.s.k.listener.SeekToCurrentErrorHandler - Skipping seek of: ConsumerRecord xxx
xxx [kafka-producer-network-thread | producer-3] DEBUG o.s.k.l.DeadLetterPublishingRecoverer - Successful dead-letter publication: SendResult xxx
如果您有更好的记录方式,我将不胜感激。
谢谢!
干杯
我想知道什么时候应该使用 org.springframework.retry.RecoveryCallback 和 org.springframework.kafka.listener.KafkaListenerErrorHandler?
截至今天,我正在使用 class(实现 org.springframework.retry.RecoveryCallback)记录错误消息并将消息发送到 DLT,它正在运行。为了向 DLT 发送消息,我使用了 Spring KafkaTemplate,然后遇到了 KafkaListenerErrorHandler 和 DeadLetterPublishingRecoverer。现在,你能给我建议吗,我应该如何使用 KafkaListenerErrorHandler 和 DeadLetterPublishingRecoverer?这可以替代 RecoveryCallback 吗?
这是我当前的 kafkaListenerContainerFactory 代码
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(primaryConsumerFactory()); factory.setRetryTemplate(retryTemplate()); factory.setRecoveryCallback(recoveryCallback); factory.getContainerProperties().setAckMode(AckMode.RECORD); factory.setConcurrency(1); factory.getContainerProperties().setMissingTopicsFatal(false); return factory; }
如果它现在可以正常工作,为什么要更改它?
有好几层,您可以根据需要选择哪一层进行错误处理。
KafkaListenerErrorHandler
将在重试期间为每次交付尝试调用,因此您通常不会将其与重试一起使用。- 重试
RecoveryCallback
在重试次数用尽后调用(或者如果您已将异常分类为不可重试则立即调用)。 ErrorHandler
- 在容器中并且在任何侦听器抛出异常时被调用,而不仅仅是@KafkaListener
s.
使用最新版本的框架,您可以使用配置有 DeadLetterPublishingRecoverer
和 BackOff
.
SeekToCurrentErrorHandler
完全替换侦听器级别的重试
DeadLetterPublishingRecoverer
旨在用于容器错误处理程序,因为它需要原始 ConsumerRecord<?, ?>
.
KafkaListenerErrorHandler
只能访问从 ConsumerRecord<?, ?>
.
Message<?>
要添加来自 @GaryRussell 的优秀上下文,这就是我目前正在使用的内容:
我正在像这样处理任何错误(a.k.a 异常):
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0L, 0L)));
为了打印这个错误,我在 .DLT
上有一个监听器,我正在打印存储在 header 中的异常堆栈跟踪,如下所示:
@KafkaListener(id = "MY_ID", topics = MY_TOPIC + ".DLT")
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {
logger.error(exceptionStackTrace);
}
注意:
我正在使用 logger.error,因为我将所有错误消息重定向到正在监视的错误日志文件。
奖金:
如果您设置以下内容:
logging.level.org.springframework.kafka=DEBUG
您将在 console/log 中看到:
xxx [org.springframework.kafka.KafkaListenerEndpointContainer#7-2-C-1] DEBUG o.s.k.listener.SeekToCurrentErrorHandler - Skipping seek of: ConsumerRecord xxx
xxx [kafka-producer-network-thread | producer-3] DEBUG o.s.k.l.DeadLetterPublishingRecoverer - Successful dead-letter publication: SendResult xxx
如果您有更好的记录方式,我将不胜感激。
谢谢!
干杯