Spring Kafka AckOnError

Spring Kafka AckOnError

我已经使用 DeadLetterPublisheingRecoverer 配置了 SeekToErrorHandler

 ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(primaryConsumerFactory());
    factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
    factory.setAutoStartup(true);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);               
    factory.setErrorHandler(new SeekToErrorHandler(new DeadLetterPublisheingRecoverer(kafkaTemplate()),3));

当侦听器(或验证器)抛出异常时,重试三次后,消息将发布到死信。

这里的问题是下次重新启动我的 spring 引导应用程序(或侦听器容器)时,相同的消息再次传递给侦听器并经过整个序列并最终落在死信上。有什么办法可以避免这种情况吗?

我已禁用自动提交并设置了 AckOnError(false) 和 AckMode(AckMode.RECORD);

在 SeekToErrorHandler 中,我发现围绕 SeekToUtil 的逻辑会抛出异常,直到配置的迭代次数完成并最终调用 BiConsumer 的 accept 方法(死信发布)。所以容器应该在最后一步(发布到死信)提交记录,对吗?我还浏览了 org.springframework.kafka.listener.ContainerProperties

中对 ackOnError(boolean) 方法的评论

当 setAckOnError(true) 时,我可以通过三次重试找到正确的行为,并最终调用死信发布者。侦听器容器重新启动时未重新传送消息

Springkafka版本是2.2.6

在 2.3 中我们添加了 ackAfterHandleSeekToCurrentErrorHandler.

的默认值为 true
@Override
public boolean isAckAfterHandle() {
    return this.ackAfterHandle;
}

/**
 * Set to false to tell the container to NOT commit the offset for a recovered record.
 * @param ackAfterHandle false to suppress committing the offset.
 * @since 2.3.2
 */
public void setAckAfterHandle(boolean ackAfterHandle) {
    this.ackAfterHandle = ackAfterHandle;
}

在 2.4 中,所有错误处理程序默认为 true。

https://github.com/spring-projects/spring-kafka/issues/1273