在 Spring Kafka 中结合阻塞和非阻塞重试
Combining blocking and non-blocking retries in Spring Kafka
我正在尝试实现具有单一主题固定回退的非阻塞 rerties。
感谢文档 https://docs.spring.io/spring-kafka/reference/html/#single-topic-fixed-delay-retries。
我能够这样做
现在我还需要对主要主题进行几次 blocked/local 重试。我一直在尝试使用 DefaultErrorHandler
来实现这一点,如下所示:
@Bean
public DefaultErrorHandler retryErrorHandler() {
return new DefaultErrorHandler(new FixedBackOff(2000, 3));
}
这似乎不适用于 RetryableTopic
。
我也尝试过以下方法 retry-topic-combine-blocking
https://docs.spring.io/spring-kafka/reference/html/#retry-topic-combine-blocking 使用 ListenerContainerFactoryConfigurer
但我在这里面临的问题是创建 bean KafkaConsumerBackoffManager
、DeadLetterPublishingRecovererFactory
,尤其是 KafkaConsumerBackoffManager
。
我需要知道这是使用 spring kafka 框架实现此目的的另一种方法,还是有一种方法可以在 bean 之上构建?
我们目前正在努力改进 non-blocking 重试组件的配置。
现在,如记录 here,您应该注入这些 bean,例如:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}}
此外,还有一个已知问题,如果您尝试在处理第一个 @KafkaListener
具有可重试主题的 bean 之前注入 bean,该功能的组件的 bean 将不会出现在上下文中,并且会抛出一个错误。
你也遇到过这种情况吗?
我们目前正在为此开发 fix,但如果这是您的问题,我们应该能够解决这个问题。
编辑:由于问题是组件尚未实例化,最有保证的解决方法是自己提供组件。
下面是有关如何执行此操作的示例。当然,如果您需要进一步的定制,请相应地进行调整。
@Configuration
public static class SO71705876Configuration {
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, Clock.systemUTC());
lcfc.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
@Bean(name = RetryTopicInternalBeanNames.KAFKA_CONSUMER_BACKOFF_MANAGER)
public KafkaConsumerBackoffManager backOffManager(ApplicationContext context) {
PartitionPausingBackOffManagerFactory managerFactory =
new PartitionPausingBackOffManagerFactory();
managerFactory.setApplicationContext(context);
return managerFactory.create();
}
@Bean(name = RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
public DeadLetterPublishingRecovererFactory dlprFactory(DestinationTopicResolver resolver) {
return new DeadLetterPublishingRecovererFactory(resolver);
}
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
return new DefaultDestinationTopicResolver(Clock.systemUTC(), context);
}
在下一个版本中,这应该不再是问题了。请让我知道这是否适合您,或者是否需要对此解决方法进行任何进一步调整。
谢谢。
我正在尝试实现具有单一主题固定回退的非阻塞 rerties。
感谢文档 https://docs.spring.io/spring-kafka/reference/html/#single-topic-fixed-delay-retries。
我能够这样做现在我还需要对主要主题进行几次 blocked/local 重试。我一直在尝试使用 DefaultErrorHandler
来实现这一点,如下所示:
@Bean
public DefaultErrorHandler retryErrorHandler() {
return new DefaultErrorHandler(new FixedBackOff(2000, 3));
}
这似乎不适用于 RetryableTopic
。
我也尝试过以下方法 retry-topic-combine-blocking
https://docs.spring.io/spring-kafka/reference/html/#retry-topic-combine-blocking 使用 ListenerContainerFactoryConfigurer
但我在这里面临的问题是创建 bean KafkaConsumerBackoffManager
、DeadLetterPublishingRecovererFactory
,尤其是 KafkaConsumerBackoffManager
。
我需要知道这是使用 spring kafka 框架实现此目的的另一种方法,还是有一种方法可以在 bean 之上构建?
我们目前正在努力改进 non-blocking 重试组件的配置。
现在,如记录 here,您应该注入这些 bean,例如:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}}
此外,还有一个已知问题,如果您尝试在处理第一个 @KafkaListener
具有可重试主题的 bean 之前注入 bean,该功能的组件的 bean 将不会出现在上下文中,并且会抛出一个错误。
你也遇到过这种情况吗?
我们目前正在为此开发 fix,但如果这是您的问题,我们应该能够解决这个问题。
编辑:由于问题是组件尚未实例化,最有保证的解决方法是自己提供组件。
下面是有关如何执行此操作的示例。当然,如果您需要进一步的定制,请相应地进行调整。
@Configuration
public static class SO71705876Configuration {
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, Clock.systemUTC());
lcfc.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
@Bean(name = RetryTopicInternalBeanNames.KAFKA_CONSUMER_BACKOFF_MANAGER)
public KafkaConsumerBackoffManager backOffManager(ApplicationContext context) {
PartitionPausingBackOffManagerFactory managerFactory =
new PartitionPausingBackOffManagerFactory();
managerFactory.setApplicationContext(context);
return managerFactory.create();
}
@Bean(name = RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
public DeadLetterPublishingRecovererFactory dlprFactory(DestinationTopicResolver resolver) {
return new DeadLetterPublishingRecovererFactory(resolver);
}
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
return new DefaultDestinationTopicResolver(Clock.systemUTC(), context);
}
在下一个版本中,这应该不再是问题了。请让我知道这是否适合您,或者是否需要对此解决方法进行任何进一步调整。
谢谢。