如何用 DefaultErrorHandler (spring-kafka) 替换已弃用的 SeekToCurrentErrorHandler?

How to replace deprecated SeekToCurrentErrorHandler with DefaultErrorHandler (spring-kafka)?

我正在尝试找到一种方法来使用新的 DefaultErrorHandler 而不是 spring-kafka 2.8.1 中已弃用的 SeekToCurrentErrorHandler,以便在出现错误时覆盖重试默认行为。我想“停止”重试过程,因此如果发生错误,则不应进行重试。

现在我在配置 class 中拥有以下按预期工作的 bean:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new **SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L)**));
    factory.setConsumerFactory(requestConsumerFactory());
    factory.setReplyTemplate(kafkaTemplate());
    return factory;
}

由于在这个 spring kafka 版本中,STCEH 已被弃用,我尝试在相同的配置中执行以下操作 class:

@Bean
public DefaultErrorHandler eh() {
    return new DefaultErrorHandler(new FixedBackOff(0, 1));
}

不过好像不行。万一出错,重试次数是默认的,正如我在日志中看到的那样:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 DefaultErrorHandler - Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} 已耗尽 topicX

应如何使用此 DefaultErrorHandler 以实现所需的行为?还是我应该使用其他东西?

提前致谢!

factory.setCommonErrorHandler(new Default....)

CommonErrorHandler bean 的引导自动配置需要 Boot 2.6。

https://github.com/spring-projects/spring-boot/commit/c3583a4b06cff3f53b3322cd79f2b64d17211d0e

  1. factory.setErrorHandler(新 SeekToCurrentErrorHandler(新 FixedBackOff(0L, 1L)));实际上,它最多会重试 1 次投递(2 次投递尝试)。 (https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#seek-to-current)

  2. 默认的重试次数是**9(( (FixedBackOff(0L, 9L)) 而不是1 (https://docs.spring.io/spring-kafka/docs/2.8.1/reference/html/#default-eh)

  3. 你应该尝试 setCommonErrorHandler 而不是像 factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L));

    这样的 setErrorHandler