如何不在 KafkaListener 中提交偏移量

How to not commit offsets in KafkaListener

我正在使用 KafkaListener 并希望根据消息是否成功处理来控制 Offsets 的提交。为此,我使用下面的代码,但它在 nack(index, sleep):

上抛出异常
    @KafkaListener(topics="topic-name", groupId="group"){
    someMethod(@Header(KafkaHeaders.OFFSET) int offset, Acknowlegement ack, String message){
       try{
           processMessage(message);
           ack.acknowledge();
       }catch(Exception e){
           ack.nack(offset, 1000);
       }
    }

我已将 ackMode 设置为手动,并将自动提交 属性 设置为 false。消息处理失败时 nack 抛出异常:

      Exception: nack(index, sleep) is not supported by this Acknowledgement.

也欢迎任何其他方式来处理这种情况。

nack(index, sleep) 用于批处理侦听器 List<String> messages。索引是告诉容器列表中哪条消息失败了。

对于记录侦听器,使用 nack(sleep) - 容器已经知道哪个记录失败了。

参见 the documentation

Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.

With a record listener, when nack() is called, any pending offsets are committed, the remaing records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next poll(). The consumer thread can be paused before redelivery, by setting the sleep argument. This is similar functionality to throwing an exception when the container is configured with a SeekToCurrentErrorHandler.

When using a batch listener, you can specify the index within the batch where the failure occurred. When nack() is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next poll(). This is an improvement over the SeekToCurrentBatchErrorHandler, which can only seek the entire batch for redelivery.