Spring Cloud @StreamListener 不提供确认 header 即使 auto-commit 设置为 false
Spring Cloud @StreamListener doesn't provide an Acknowledgement header even auto-commit set to false
我目前陷入了手动管理 Kafka 偏移量和提交的简单示例中。我有一个带有 Spring Cloud Streams 的应用程序,它设置了 enable.auto.commit = false
(在打印 ConsumerValues 时在启动日志中看到),但是当我解析消息时它仍然没有提供确认 header .
这是我的听众:
@StreamListener(Sink.INPUT)
public void handleSchedulerMessage(@Payload SchedulerEvent event, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
log.debug("[message={}]", event);
// todo: processing
log.debug("Event processed successfully [event={}]", event);
}
配置的YAML也很简单:
spring:
application:
name: scheduler
cloud:
stream:
kafka:
binder:
brokers: *kafka-broker*:9092
zkNodes: *zookeeper*:2181
bindings:
input:
destination: scheduler
contentType: application/json
consumer:
autoCommitOffset: false
而且当我发送消息时,立即弹出错误:
2018-05-22 11:38:32.470 ERROR 11651 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: Missing header 'kafka_acknowledgment' for method parameter type [interface org.springframework.kafka.support.Acknowledgment], failedMessage=GenericMessage [payload=byte[38], headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@fdac355, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=scheduler, kafka_receivedTimestamp=1526981909241, contentType=application/json}]
收到的消息不包含所需的 header,这与文档在禁用自动提交时所说的相反:
Whether to autocommit offsets when a message has been processed. If set to false, a header with the key kafka_acknowledgment of the type org.springframework.kafka.support.Acknowledgment header will be present in the inbound message. Applications may use this header for acknowledging messages.
代码并不复杂,而且我没有使用任何 pre-generated 项目。例子并不能解释我所做的,所以我不知道我可能遗漏了什么。
您的 YAML 中似乎少了一级缩进。
根据文档,属性 必须是这样的:
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
但是你的样本是这样的:
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false
注意中间多余的.kafka.
我不知道如何帮助正确管理 YAML,但这是我们必须要做的才能使其正常工作。
我目前陷入了手动管理 Kafka 偏移量和提交的简单示例中。我有一个带有 Spring Cloud Streams 的应用程序,它设置了 enable.auto.commit = false
(在打印 ConsumerValues 时在启动日志中看到),但是当我解析消息时它仍然没有提供确认 header .
这是我的听众:
@StreamListener(Sink.INPUT)
public void handleSchedulerMessage(@Payload SchedulerEvent event, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
log.debug("[message={}]", event);
// todo: processing
log.debug("Event processed successfully [event={}]", event);
}
配置的YAML也很简单:
spring:
application:
name: scheduler
cloud:
stream:
kafka:
binder:
brokers: *kafka-broker*:9092
zkNodes: *zookeeper*:2181
bindings:
input:
destination: scheduler
contentType: application/json
consumer:
autoCommitOffset: false
而且当我发送消息时,立即弹出错误:
2018-05-22 11:38:32.470 ERROR 11651 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: Missing header 'kafka_acknowledgment' for method parameter type [interface org.springframework.kafka.support.Acknowledgment], failedMessage=GenericMessage [payload=byte[38], headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@fdac355, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=scheduler, kafka_receivedTimestamp=1526981909241, contentType=application/json}]
收到的消息不包含所需的 header,这与文档在禁用自动提交时所说的相反:
Whether to autocommit offsets when a message has been processed. If set to false, a header with the key kafka_acknowledgment of the type org.springframework.kafka.support.Acknowledgment header will be present in the inbound message. Applications may use this header for acknowledging messages.
代码并不复杂,而且我没有使用任何 pre-generated 项目。例子并不能解释我所做的,所以我不知道我可能遗漏了什么。
您的 YAML 中似乎少了一级缩进。
根据文档,属性 必须是这样的:
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
但是你的样本是这样的:
spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false
注意中间多余的.kafka.
我不知道如何帮助正确管理 YAML,但这是我们必须要做的才能使其正常工作。