使用 RetryTopicConfiguration 时在错误级别记录 BackoffExceptions
BackoffExceptions are logged at error level when using RetryTopicConfiguration
我是最近添加的 RetryTopicConfiguration
的满意用户,但是有一个小问题困扰着我。
我使用的设置如下:
@Bean
public RetryTopicConfiguration retryTopicConfiguration(
KafkaTemplate<String, String> template,
@Value("${kafka.topic.in}") String topicToInclude,
@Value("${spring.application.name}") String appName) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(5000L)
.maxAttempts(3)
.retryTopicSuffix("-" + appName + ".retry")
.suffixTopicsWithIndexValues()
.dltSuffix("-" + appName + ".dlq")
.includeTopic(topicToInclude)
.dltHandlerMethod(KAFKA_EVENT_LISTENER, "handleDltEvent")
.create(template);
}
当侦听器抛出触发重试的异常时,DefaultErrorHandler
将在错误级别记录 KafkaBackoffException
。
对于 ,建议使用 ListenerContainerFactoryConfigurer
但这不会删除所有错误日志,因为我仍然在日志中看到以下内容:
2022-04-02 17:34:33.340 ERROR 8054 --- [e.retry-0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Recovery of record (topic-spring-kafka-logging-issue.retry-0-0@0) failed
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic-spring-kafka-logging-issue.retry-0 is not ready for consumption, backing off for approx. 4468 millis.
是否可以在不添加自定义的情况下更改日志级别ErrorHandler
?
Spring-启动版本:2.6.6
Spring-卡夫卡版本:2.8.4
JDK版本:11
示例项目:here
感谢您提出如此完整的问题。由于新的 combine blocking and non-blocking exceptions 功能,这是 Apache Kafka 2.8.4
的 Spring 的一个已知问题,并且已针对 2.8.5
.
修复
解决方法是清除阻塞异常机制,例如:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(0, 0));
lcfc.setErrorHandlerCustomizer(eh -> ((DefaultErrorHandler) eh).setClassifications(Collections.emptyMap(), true));
return lcfc;
}
请告诉我这是否适合你。
谢谢。
编辑:
此解决方法仅禁用 blocking retries
,因为根据原始答案中的 link,2.8.4 可以与 non-blocking
一起使用。 non-blocking retries
的异常 class 化在 DefaultDestinationTopicResolver
class 中,您可以设置 FATAL
异常 here.
编辑:或者,您可以通过添加 Spring 快照存储库来使用 Spring Kafka 2.8.5-SNAPSHOT
版本,例如:
repositories {
maven {
url 'https://repo.spring.io/snapshot'
}
}
dependencies {
implementation 'org.springframework.kafka:spring-kafka:2.8.5-SNAPSHOT'
}
您也可以降级到 Spring Kafka 2.8.3
。
正如 Gary Russell 指出的那样,如果您的应用程序已经在生产中,则不应使用 SNAPSHOT
版本,并且 2.8.5
已过时 in a couple of weeks。
编辑 2:很高兴听到您对这个功能感到满意!
我是最近添加的 RetryTopicConfiguration
的满意用户,但是有一个小问题困扰着我。
我使用的设置如下:
@Bean
public RetryTopicConfiguration retryTopicConfiguration(
KafkaTemplate<String, String> template,
@Value("${kafka.topic.in}") String topicToInclude,
@Value("${spring.application.name}") String appName) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(5000L)
.maxAttempts(3)
.retryTopicSuffix("-" + appName + ".retry")
.suffixTopicsWithIndexValues()
.dltSuffix("-" + appName + ".dlq")
.includeTopic(topicToInclude)
.dltHandlerMethod(KAFKA_EVENT_LISTENER, "handleDltEvent")
.create(template);
}
当侦听器抛出触发重试的异常时,DefaultErrorHandler
将在错误级别记录 KafkaBackoffException
。
对于 ListenerContainerFactoryConfigurer
但这不会删除所有错误日志,因为我仍然在日志中看到以下内容:
2022-04-02 17:34:33.340 ERROR 8054 --- [e.retry-0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Recovery of record (topic-spring-kafka-logging-issue.retry-0-0@0) failed
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic-spring-kafka-logging-issue.retry-0 is not ready for consumption, backing off for approx. 4468 millis.
是否可以在不添加自定义的情况下更改日志级别ErrorHandler
?
Spring-启动版本:2.6.6
Spring-卡夫卡版本:2.8.4
JDK版本:11
示例项目:here
感谢您提出如此完整的问题。由于新的 combine blocking and non-blocking exceptions 功能,这是 Apache Kafka 2.8.4
的 Spring 的一个已知问题,并且已针对 2.8.5
.
解决方法是清除阻塞异常机制,例如:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(0, 0));
lcfc.setErrorHandlerCustomizer(eh -> ((DefaultErrorHandler) eh).setClassifications(Collections.emptyMap(), true));
return lcfc;
}
请告诉我这是否适合你。
谢谢。
编辑:
此解决方法仅禁用 blocking retries
,因为根据原始答案中的 link,2.8.4 可以与 non-blocking
一起使用。 non-blocking retries
的异常 class 化在 DefaultDestinationTopicResolver
class 中,您可以设置 FATAL
异常 here.
编辑:或者,您可以通过添加 Spring 快照存储库来使用 Spring Kafka 2.8.5-SNAPSHOT
版本,例如:
repositories {
maven {
url 'https://repo.spring.io/snapshot'
}
}
dependencies {
implementation 'org.springframework.kafka:spring-kafka:2.8.5-SNAPSHOT'
}
您也可以降级到 Spring Kafka 2.8.3
。
正如 Gary Russell 指出的那样,如果您的应用程序已经在生产中,则不应使用 SNAPSHOT
版本,并且 2.8.5
已过时 in a couple of weeks。
编辑 2:很高兴听到您对这个功能感到满意!