使用 DeadLetterPublishingRecoverer 的状态重试导致 RetryCacheCapacityExceededException

Stateful-Retry with DeadLetterPublishingRecoverer causing RetryCacheCapacityExceededException

我的容器工厂有一个 SeekToCurrentErrorHandler,它使用 DeadLetterPublishingRecoverer 发布到 DLT,某些 'NotRetryableException' 类型的异常,并无限次地为其他类型的异常寻找相同的偏移量。使用此设置,在导致不可重试异常的一定数量的有效负载之后,存储重试上下文的映射 - MapRetryContextCache (spring-retry) 溢出并抛出 RetryCacheCapacityExceededException。从最初的外观来看,DLT 恢复器要处理的消息的重试上下文不会从 MapRetryContextCache 中删除。或者我的配置不正确。

SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer(kafkaTemplate),-1);
eh.addNotRetryableException(SomeNonRetryableException.class);
        eh.setCommitRecovered(true);
        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = getContainerFactory();
        factory.setErrorHandler(eh);
        factory.setRetryTemplate(retryTemplate);
        factory.setStatefulRetry(true);

为了清除缓存,您必须在重试模板中进行恢复,而不是在错误处理程序中。

@SpringBootApplication
public class So56846940Application {

    public static void main(String[] args) {
        SpringApplication.run(So56846940Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so56846940").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topicDLT() {
        return TopicBuilder.name("so56846940.DLT").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template,
            ConcurrentKafkaListenerContainerFactory<String, String> factory,
            DeadLetterPublishingRecoverer recoverer) {

        factory.setRetryTemplate(new RetryTemplate());
        factory.setStatefulRetry(true);
        factory.setRecoveryCallback(context -> {
            recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
                    (Exception) context.getLastThrowable());
            return null;
        });

        return args -> IntStream.range(0, 5000).forEach(i -> template.send("so56846940", "foo"));
    }

    @KafkaListener(id = "so56846940", topics = "so56846940")
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException();
    }

    @Bean
    public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<String, String> template) {
        return new DeadLetterPublishingRecoverer(template);
    }

    @Bean
    public SeekToCurrentErrorHandler eh() {
        return new SeekToCurrentErrorHandler(4);
    }

}

错误处理程序的重试次数必须至少与重试模板一样多,以便重试次数耗尽并清除缓存。

您还应该使用与错误处理程序相同的不可重试异常来配置 RetryTemplate。

我们会在参考手册中说明。