如何用 DefaultErrorHandler (spring-kafka) 替换已弃用的 SeekToCurrentErrorHandler?
How to replace deprecated SeekToCurrentErrorHandler with DefaultErrorHandler (spring-kafka)?
我正在尝试找到一种方法来使用新的 DefaultErrorHandler 而不是 spring-kafka 2.8.1 中已弃用的 SeekToCurrentErrorHandler,以便在出现错误时覆盖重试默认行为。我想“停止”重试过程,因此如果发生错误,则不应进行重试。
现在我在配置 class 中拥有以下按预期工作的 bean:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new **SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L)**));
factory.setConsumerFactory(requestConsumerFactory());
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
由于在这个 spring kafka 版本中,STCEH 已被弃用,我尝试在相同的配置中执行以下操作 class:
@Bean
public DefaultErrorHandler eh() {
return new DefaultErrorHandler(new FixedBackOff(0, 1));
}
不过好像不行。万一出错,重试次数是默认的,正如我在日志中看到的那样:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 DefaultErrorHandler - Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} 已耗尽 topicX
应如何使用此 DefaultErrorHandler 以实现所需的行为?还是我应该使用其他东西?
提前致谢!
factory.setCommonErrorHandler(new Default....)
CommonErrorHandler
bean 的引导自动配置需要 Boot 2.6。
https://github.com/spring-projects/spring-boot/commit/c3583a4b06cff3f53b3322cd79f2b64d17211d0e
factory.setErrorHandler(新 SeekToCurrentErrorHandler(新 FixedBackOff(0L, 1L)));实际上,它最多会重试 1 次投递(2 次投递尝试)。 (https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#seek-to-current)
默认的重试次数是**9(( (FixedBackOff(0L, 9L)) 而不是1 (https://docs.spring.io/spring-kafka/docs/2.8.1/reference/html/#default-eh)
你应该尝试 setCommonErrorHandler
而不是像 factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L));
这样的 setErrorHandler
我正在尝试找到一种方法来使用新的 DefaultErrorHandler 而不是 spring-kafka 2.8.1 中已弃用的 SeekToCurrentErrorHandler,以便在出现错误时覆盖重试默认行为。我想“停止”重试过程,因此如果发生错误,则不应进行重试。
现在我在配置 class 中拥有以下按预期工作的 bean:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new **SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L)**));
factory.setConsumerFactory(requestConsumerFactory());
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
由于在这个 spring kafka 版本中,STCEH 已被弃用,我尝试在相同的配置中执行以下操作 class:
@Bean
public DefaultErrorHandler eh() {
return new DefaultErrorHandler(new FixedBackOff(0, 1));
}
不过好像不行。万一出错,重试次数是默认的,正如我在日志中看到的那样:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 DefaultErrorHandler - Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} 已耗尽 topicX
应如何使用此 DefaultErrorHandler 以实现所需的行为?还是我应该使用其他东西?
提前致谢!
factory.setCommonErrorHandler(new Default....)
CommonErrorHandler
bean 的引导自动配置需要 Boot 2.6。
https://github.com/spring-projects/spring-boot/commit/c3583a4b06cff3f53b3322cd79f2b64d17211d0e
factory.setErrorHandler(新 SeekToCurrentErrorHandler(新 FixedBackOff(0L, 1L)));实际上,它最多会重试 1 次投递(2 次投递尝试)。 (https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#seek-to-current)
默认的重试次数是**9(( (FixedBackOff(0L, 9L)) 而不是1 (https://docs.spring.io/spring-kafka/docs/2.8.1/reference/html/#default-eh)
你应该尝试
这样的setCommonErrorHandler
而不是像factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L));
setErrorHandler