在 spring kafka 中使用 SeekToCurrentErrorHandler 时如何设置重试间隔时间

How to set Retry Interval time when using SeekToCurrentErrorHandler in spring kafka

我正在 spring-boot 应用程序中使用 SeekToCurrentErrorHandler 处理容器侦听器错误。如果发生异常,我想设置重试间隔,它应该等待一段时间并重试,直到最大尝试次数。

我尝试通过设置备份策略来添加 RetryTemplate。但它没有正常工作。当错误发生最大尝试时间时,它将调用 SeekToCurrentErrorHandler 2 次。

@Bean
    public RetryPolicy retryPolicy() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
        simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
        return simpleRetryPolicy;
    }


    @Bean
    public BackOffPolicy backOffPolicy() {
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(retryInterval);
        return backOffPolicy;
    }


    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());
        return retryTemplate;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
            ChainedKafkaTransactionManager<String, String> chainedTM) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setSyncCommits(true);
    // ------->>     factory.setRetryTemplate(retryTemplate());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setTransactionManager(chainedTM);
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
        // handle errors 
        }, retryMaxAttempts);
        factory.setErrorHandler(errorHandler);
        log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
        return factory;
    }


使用SeekToCurrentErrorHandler时如何设置retry Interval时间?

这是一个new feature in the next release (2.3)

但是,您可以使用重试模板来执行此操作,但总尝试次数将是这两个属性的倍数。