Spring 云流 - kafka - 空确认 Header
Spring cloud Stream - kafka - Null Acknowledgement Header
我想使用 spring 云流手动提交偏移量 - 仅当消息处理成功时。
这是我的代码 - application.yml & Handler Class
public void process(Message<?> message) {
System.out.println(message.getPayload());
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
---------------------------------------------------------------------------------
spring:
application:
name: springCloud
cloud:
stream:
default-binder: kafka
kafka:
bindings:
myChannel:
consumer:
autoCommitOffset: false
但我的确认 object 为空,因为 header object 'kafka_acknowledgement' 本身不存在。
- 如何获得确认object?
- 我的要求是只有在处理成功时才提交偏移量,如果处理失败我不想从通道中弹出消息以便稍后读取。
上面的代码是否足以实现这一点?
您使用的是什么版本?
在 3.1 中,autoCommitOffset
被弃用,取而代之的是设置 ackMode
(在本例中为 manual
);但是,看起来 autoCommitOffset
现在已被完全忽略而不是弃用。
使用yaml文件时,请使用属性 'auto-commit-offset'.
我想使用 spring 云流手动提交偏移量 - 仅当消息处理成功时。 这是我的代码 - application.yml & Handler Class
public void process(Message<?> message) {
System.out.println(message.getPayload());
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
---------------------------------------------------------------------------------
spring:
application:
name: springCloud
cloud:
stream:
default-binder: kafka
kafka:
bindings:
myChannel:
consumer:
autoCommitOffset: false
但我的确认 object 为空,因为 header object 'kafka_acknowledgement' 本身不存在。
- 如何获得确认object?
- 我的要求是只有在处理成功时才提交偏移量,如果处理失败我不想从通道中弹出消息以便稍后读取。 上面的代码是否足以实现这一点?
您使用的是什么版本?
在 3.1 中,autoCommitOffset
被弃用,取而代之的是设置 ackMode
(在本例中为 manual
);但是,看起来 autoCommitOffset
现在已被完全忽略而不是弃用。
使用yaml文件时,请使用属性 'auto-commit-offset'.