何时使用 RecoveryCallback 与 KafkaListenerErrorHandler

when to use RecoveryCallback vs KafkaListenerErrorHandler

我想知道什么时候应该使用 org.springframework.retry.RecoveryCallback 和 org.springframework.kafka.listener.KafkaListenerErrorHandler?

截至今天,我正在使用 class(实现 org.springframework.retry.RecoveryCallback)记录错误消息并将消息发送到 DLT,它正在运行。为了向 DLT 发送消息,我使用了 Spring KafkaTemplate,然后遇到了 KafkaListenerErrorHandler 和 DeadLetterPublishingRecoverer。现在,你能给我建议吗,我应该如何使用 KafkaListenerErrorHandler 和 DeadLetterPublishingRecoverer?这可以替代 RecoveryCallback 吗?

这是我当前的 kafkaListenerContainerFactory 代码

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(primaryConsumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(recoveryCallback);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setConcurrency(1);  
factory.getContainerProperties().setMissingTopicsFatal(false);
return factory;   }

如果它现在可以正常工作,为什么要更改它?

有好几层,您可以根据需要选择哪一层进行错误处理。

  • KafkaListenerErrorHandler 将在重试期间为每次交付尝试调用,因此您通常不会将其与重试一起使用。
  • 重试 RecoveryCallback 在重试次数用尽后调用(或者如果您已将异常分类为不可重试则立即调用)。
  • ErrorHandler - 在容器中并且在任何侦听器抛出异常时被调用,而不仅仅是 @KafkaListeners.

使用最新版本的框架,您可以使用配置有 DeadLetterPublishingRecovererBackOff.

SeekToCurrentErrorHandler 完全替换侦听器级别的重试

DeadLetterPublishingRecoverer 旨在用于容器错误处理程序,因为它需要原始 ConsumerRecord<?, ?>.

KafkaListenerErrorHandler 只能访问从 ConsumerRecord<?, ?>.

转换而来的 spring-消息传递 Message<?>

要添加来自 @GaryRussell 的优秀上下文,这就是我目前正在使用的内容:

我正在像这样处理任何错误(a.k.a 异常):

factory.setErrorHandler(new SeekToCurrentErrorHandler(
    new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0L, 0L)));

为了打印这个错误,我在 .DLT 上有一个监听器,我正在打印存储在 header 中的异常堆栈跟踪,如下所示:

@KafkaListener(id = "MY_ID", topics = MY_TOPIC + ".DLT")
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
    @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {
            
    logger.error(exceptionStackTrace);
}

注意:
我正在使用 logger.error,因为我将所有错误消息重定向到正在监视的错误日志文件。

奖金:
如果您设置以下内容: logging.level.org.springframework.kafka=DEBUG

您将在 console/log 中看到:

xxx [org.springframework.kafka.KafkaListenerEndpointContainer#7-2-C-1] DEBUG o.s.k.listener.SeekToCurrentErrorHandler - Skipping seek of: ConsumerRecord xxx
xxx [kafka-producer-network-thread | producer-3] DEBUG o.s.k.l.DeadLetterPublishingRecoverer - Successful dead-letter publication: SendResult xxx

如果您有更好的记录方式,我将不胜感激。 谢谢!

干杯