有没有什么方法可以使用 Acknowledge.nack for spring-cloud-stream-binder-kafka 在 maxAttempts 时间内重试?

Is there any method to retry within maxAttempts times using Acknowledge.nack for spring-cloud-stream-binder-kafka?

我正在尝试在kafka中使用consuming batches,我发现文档说不支持重试如下。

Retry within the binder is not supported when using batch mode, so maxAttempts will be overridden to 1. You can configure a SeekToCurrentBatchErrorHandler (using a ListenerContainerCustomizer) to achieve similar functionality to retry in the binder. You can also use a manual AckMode and call Ackowledgment.nack(index, sleep) to commit the offsets for a partial batch and have the remaining records redelivered. Refer to the Spring for Apache Kafka documentation for more information about these techniques

如果我使用Acknowledge.nack(index,sleep),它会在发生错误时无限重试。有什么方法可以使用 Acknowledge.nack 在 maxAttempts 时间内重试吗?

代码如下

@StreamListener(Sink.INPUT)
    public void consume(@Payload List<PayLoad> payloads, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
        
        try {
            consume(payloads);
            acknowledgment.acknowledge();
        } catch (Exception e) {
            acknowledgment.nack(0, 50);
            
        }

    }

没有;您必须自己跟踪重试次数。

但是,从 2.5 版开始,您现在可以使用 RecoveringBatchErrorHandler 抛出特定异常来告诉处理程序哪个记录失败,它会提交该记录之前记录的偏移量并将重试逻辑应用于失败的记录。

https://docs.spring.io/spring-kafka/reference/html/#recovering-batch-eh