DeadLetterPublishingRecoverer 如何与批量消费者一起工作?

How does DeadLetterPublishingRecoverer work with batch consumer?

我想了解 DeadLetterPublishingRecoverer 在批量消费者的情况下是如何工作的。我能看到的文档中的大多数示例都是单个记录消费者的示例。

我想为批处理消费者实现重试逻辑。如果重试在阈值后也失败(假设 3 次),则 DeadLetterPublishingRecoverer 应该将消息发布到死信主题。顺序对我的用例无关紧要。

我不确定是否:

DeadLetterPublishingRecoverer会将失败的记录发布到DLT,批次中的其余记录将由消费者处理。

或者

DeadLetterPublishingRecoverer 会将失败的记录以及整个批次(包括好消息)发布到 DLT。

public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory(KafkaTemplate<String, String> kafkaTemplate) {
       
        ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    
        concurrentKafkaListenerContainerFactory.setBatchListener(true);
        
        DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
        RecoveringBatchErrorHandler recoveringBatchErrorHandler = new RecoveringBatchErrorHandler(deadLetterPublishingRecoverer,new FixedBackOff(1000L, 3L));

        concurrentKafkaListenerContainerFactory.setBatchErrorHandler(recoveringBatchErrorHandler);

        return concurrentKafkaListenerContainerFactory;
    }

随着RecoveringBatchErrorHandlerDefaultErrorHandler在2.8及以后的版本),监听器必须抛出一个BatchListenerFailedException;这会告诉错误处理程序在批处理中发生错误的位置。

之前的记录将提交偏移量;重试失败的记录(以及批次中的后续记录);当重试次数用完后,将失败的记录发送给DLT,其余记录重试。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh

https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#recovering-batch-eh

对于 2.8 之前的版本。