Spring Kafka AckOnError
Spring Kafka AckOnError
我已经使用 DeadLetterPublisheingRecoverer 配置了 SeekToErrorHandler
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
factory.setAutoStartup(true);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(new SeekToErrorHandler(new DeadLetterPublisheingRecoverer(kafkaTemplate()),3));
当侦听器(或验证器)抛出异常时,重试三次后,消息将发布到死信。
这里的问题是下次重新启动我的 spring 引导应用程序(或侦听器容器)时,相同的消息再次传递给侦听器并经过整个序列并最终落在死信上。有什么办法可以避免这种情况吗?
我已禁用自动提交并设置了 AckOnError(false) 和 AckMode(AckMode.RECORD);
在 SeekToErrorHandler 中,我发现围绕 SeekToUtil 的逻辑会抛出异常,直到配置的迭代次数完成并最终调用 BiConsumer 的 accept 方法(死信发布)。所以容器应该在最后一步(发布到死信)提交记录,对吗?我还浏览了 org.springframework.kafka.listener.ContainerProperties
中对 ackOnError(boolean) 方法的评论
当 setAckOnError(true) 时,我可以通过三次重试找到正确的行为,并最终调用死信发布者。侦听器容器重新启动时未重新传送消息
Springkafka版本是2.2.6
在 2.3 中我们添加了 ackAfterHandle
; SeekToCurrentErrorHandler
.
的默认值为 true
@Override
public boolean isAckAfterHandle() {
return this.ackAfterHandle;
}
/**
* Set to false to tell the container to NOT commit the offset for a recovered record.
* @param ackAfterHandle false to suppress committing the offset.
* @since 2.3.2
*/
public void setAckAfterHandle(boolean ackAfterHandle) {
this.ackAfterHandle = ackAfterHandle;
}
在 2.4 中,所有错误处理程序默认为 true。
我已经使用 DeadLetterPublisheingRecoverer 配置了 SeekToErrorHandler
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
factory.setAutoStartup(true);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(new SeekToErrorHandler(new DeadLetterPublisheingRecoverer(kafkaTemplate()),3));
当侦听器(或验证器)抛出异常时,重试三次后,消息将发布到死信。
这里的问题是下次重新启动我的 spring 引导应用程序(或侦听器容器)时,相同的消息再次传递给侦听器并经过整个序列并最终落在死信上。有什么办法可以避免这种情况吗?
我已禁用自动提交并设置了 AckOnError(false) 和 AckMode(AckMode.RECORD);
在 SeekToErrorHandler 中,我发现围绕 SeekToUtil 的逻辑会抛出异常,直到配置的迭代次数完成并最终调用 BiConsumer 的 accept 方法(死信发布)。所以容器应该在最后一步(发布到死信)提交记录,对吗?我还浏览了 org.springframework.kafka.listener.ContainerProperties
中对 ackOnError(boolean) 方法的评论当 setAckOnError(true) 时,我可以通过三次重试找到正确的行为,并最终调用死信发布者。侦听器容器重新启动时未重新传送消息
Springkafka版本是2.2.6
在 2.3 中我们添加了 ackAfterHandle
; SeekToCurrentErrorHandler
.
@Override
public boolean isAckAfterHandle() {
return this.ackAfterHandle;
}
/**
* Set to false to tell the container to NOT commit the offset for a recovered record.
* @param ackAfterHandle false to suppress committing the offset.
* @since 2.3.2
*/
public void setAckAfterHandle(boolean ackAfterHandle) {
this.ackAfterHandle = ackAfterHandle;
}
在 2.4 中,所有错误处理程序默认为 true。