使用 RetryTopicConfiguration 时在错误级别记录 BackoffExceptions

BackoffExceptions are logged at error level when using RetryTopicConfiguration

我是最近添加的 RetryTopicConfiguration 的满意用户,但是有一个小问题困扰着我。 我使用的设置如下:

@Bean
public RetryTopicConfiguration retryTopicConfiguration(
    KafkaTemplate<String, String> template,
    @Value("${kafka.topic.in}") String topicToInclude,
    @Value("${spring.application.name}") String appName) {
    return RetryTopicConfigurationBuilder
        .newInstance()
        .fixedBackOff(5000L)
        .maxAttempts(3)
        .retryTopicSuffix("-" + appName + ".retry")
        .suffixTopicsWithIndexValues()
        .dltSuffix("-" + appName + ".dlq")
        .includeTopic(topicToInclude)
        .dltHandlerMethod(KAFKA_EVENT_LISTENER, "handleDltEvent")
        .create(template);
}

当侦听器抛出触发重试的异常时,DefaultErrorHandler 将在错误级别记录 KafkaBackoffException

对于 ,建议使用 ListenerContainerFactoryConfigurer 但这不会删除所有错误日志,因为我仍然在日志中看到以下内容:

2022-04-02 17:34:33.340 ERROR 8054 --- [e.retry-0-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Recovery of record (topic-spring-kafka-logging-issue.retry-0-0@0) failed

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic-spring-kafka-logging-issue.retry-0 is not ready for consumption, backing off for approx. 4468 millis.

是否可以在不添加自定义的情况下更改日志级别ErrorHandler

Spring-启动版本:2.6.6
Spring-卡夫卡版本:2.8.4
JDK版本:11

示例项目:here

感谢您提出如此完整的问题。由于新的 combine blocking and non-blocking exceptions 功能,这是 Apache Kafka 2.8.4 的 Spring 的一个已知问题,并且已针对 2.8.5.

修复

解决方法是清除阻塞异常机制,例如:

@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
                        DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
                        @Qualifier(RetryTopicInternalBeanNames
                            .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
    ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);

    lcfc.setBlockingRetriesBackOff(new FixedBackOff(0, 0));
    lcfc.setErrorHandlerCustomizer(eh -> ((DefaultErrorHandler) eh).setClassifications(Collections.emptyMap(), true));
    return lcfc;
}

请告诉我这是否适合你。

谢谢。

编辑: 此解决方法仅禁用 blocking retries,因为根据原始答案中的 link,2.8.4 可以与 non-blocking 一起使用。 non-blocking retries 的异常 class 化在 DefaultDestinationTopicResolver class 中,您可以设置 FATAL 异常 here.

编辑:或者,您可以通过添加 Spring 快照存储库来使用 Spring Kafka 2.8.5-SNAPSHOT 版本,例如:

repositories {
    maven {
        url 'https://repo.spring.io/snapshot'
    }
}

dependencies {
    implementation 'org.springframework.kafka:spring-kafka:2.8.5-SNAPSHOT'
}

您也可以降级到 Spring Kafka 2.8.3

正如 Gary Russell 指出的那样,如果您的应用程序已经在生产中,则不应使用 SNAPSHOT 版本,并且 2.8.5 已过时 in a couple of weeks

编辑 2:很高兴听到您对这个功能感到满意!