Spring cloud Kafka 失败时无限重试

Spring cloud Kafka does infinite retry when it fails

目前,我遇到一个问题,其中一个消费者函数抛出错误,导致 Kafka 一次又一次地重试记录。

@Bean
public Consumer<List<RuleEngineSubject>> processCohort() {
    return personDtoList -> {
        
        for(RuleEngineSubject subject : personDtoList)
            processSubject(subject);

    };
}

这是消费者 processSubject 抛出导致其失败的自定义错误。

processCohort-in-0:
  destination: internal-process-cohort
  consumer:
    max-attempts: 1
    batch-mode: true
    concurrency: 10
  group: process-cohort-group

以上是我的Kafka活页夹。

目前,我正在尝试重试 2 次,然后发送到死信队列,但我一直没有成功,不确定采用哪种方法是正确的。

我已经尝试实现一个自定义处理程序,该处理程序将在失败时处理错误但不会再次重试,而且我不确定如何发送到死信队列

   @Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {

    return (container, dest, group) -> {
        if (group.equals("process-cohort-group")) {
            container.setBatchErrorHandler(new BatchErrorHandler() {
                @Override
                public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
                    System.out.println(data.records(dest).iterator().);
                    data.records(dest).forEach(r -> {
                        System.out.println(r.value());
                    });
                    System.out.println("failed payload='{}'" + thrownException.getLocalizedMessage());
                }
            });
        }
      
    };

}

这会停止无限重试,但不会发送死信队列。我能否获得有关如何重试两次然后发送死信队列的建议。据我了解,批处理侦听器在出现错误时无法恢复,有人可以帮助阐明这一点

您需要在监听器容器中配置合适的错误处理器;您可以在绑定中禁用重试和 dlq 并改用 DeadLetterPublishingRecoverer 。查看答案

重试15次后扔到topicname.DLT话题

@Bean
      public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                new DeadLetterPublishingRecoverer(kafkaTemplate()), kafkaBackOffPolicy()));
        factory.setConsumerFactory(kafkaConsumerFactory());
        return factory;
      }
    
@Bean
      public ExponentialBackOffWithMaxRetries kafkaBackOffPolicy() {
        var exponentialBackOff = new ExponentialBackOffWithMaxRetries(15);
        exponentialBackOff.setInitialInterval(Duration.ofMillis(500).toMillis());
        exponentialBackOff.setMultiplier(2);
        exponentialBackOff.setMaxInterval(Duration.ofSeconds(2).toMillis());
        return exponentialBackOff;
      }