在 Spring Kafka 中结合阻塞和非阻塞重试

Combining blocking and non-blocking retries in Spring Kafka

我正在尝试实现具有单一主题固定回退的非阻塞 rerties。

感谢文档 https://docs.spring.io/spring-kafka/reference/html/#single-topic-fixed-delay-retries

我能够这样做

现在我还需要对主要主题进行几次 blocked/local 重试。我一直在尝试使用 DefaultErrorHandler 来实现这一点,如下所示:

@Bean
public DefaultErrorHandler retryErrorHandler() {
        return new DefaultErrorHandler(new FixedBackOff(2000, 3));
}

这似乎不适用于 RetryableTopic

我也尝试过以下方法 retry-topic-combine-blocking https://docs.spring.io/spring-kafka/reference/html/#retry-topic-combine-blocking 使用 ListenerContainerFactoryConfigurer 但我在这里面临的问题是创建 bean KafkaConsumerBackoffManagerDeadLetterPublishingRecovererFactory,尤其是 KafkaConsumerBackoffManager

我需要知道这是使用 spring kafka 框架实现此目的的另一种方法,还是有一种方法可以在 bean 之上构建?

我们目前正在努力改进 non-blocking 重试组件的配置。

现在,如记录 here,您应该注入这些 bean,例如:

@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
                                               DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
                                               @Qualifier(RetryTopicInternalBeanNames
                                                       .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
    ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
    lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
    lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
    return lcfc;
}}

此外,还有一个已知问题,如果您尝试在处理第一个 @KafkaListener 具有可重试主题的 bean 之前注入 bean,该功能的组件的 bean 将不会出现在上下文中,并且会抛出一个错误。

你也遇到过这种情况吗?

我们目前正在为此开发 fix,但如果这是您的问题,我们应该能够解决这个问题。

编辑:由于问题是组件尚未实例化,最有保证的解决方法是自己提供组件。

下面是有关如何执行此操作的示例。当然,如果您需要进一步的定制,请相应地进行调整。

    @Configuration
    public static class SO71705876Configuration {

        @Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
        public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
                DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory) {
            ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, Clock.systemUTC());
            lcfc.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);
            lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
            return lcfc;
        }

        @Bean(name = RetryTopicInternalBeanNames.KAFKA_CONSUMER_BACKOFF_MANAGER)
        public KafkaConsumerBackoffManager backOffManager(ApplicationContext context) {
            PartitionPausingBackOffManagerFactory managerFactory =
                    new PartitionPausingBackOffManagerFactory();
            managerFactory.setApplicationContext(context);
            return managerFactory.create();
        }

        @Bean(name = RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
        public DeadLetterPublishingRecovererFactory dlprFactory(DestinationTopicResolver resolver) {
            return new DeadLetterPublishingRecovererFactory(resolver);
        }

        @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
        public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
            return new DefaultDestinationTopicResolver(Clock.systemUTC(), context);
        }

在下一个版本中,这应该不再是问题了。请让我知道这是否适合您,或者是否需要对此解决方法进行任何进一步调整。

谢谢。