使用 spring 云流在 kafka 中自动提交
Auto commit in kafka with spring cloud stream
我有一个应用程序,我想在其中手动执行 Kafka 消息中的 (n)ack。根据 spring 云文档,应该用 autoCommitOffset
spring cloud documentation
完成
然而,在我的应用程序中,即使定义这样的 属性 header KafkaHeaders.ACKNOWLEDGMENT
仍然是空的。
我的配置是这样的
spring.cloud.stream.kafka.binder.brokers=${KAFKA_BROKER_LIST}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.mytopic.destination=MyInputTopic
spring.cloud.stream.bindings.mytopic.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.mytopic.consumer.autoCommitOffset=false
我的消费者:
@StreamListener("myTopic")
public void consume(@NotNull @Valid Message<MyTopic> message) {
MyTopic payload = message.getPayload();
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); // always null
}
我正在使用 java 13 和 spring boot 2.2.5.RELEASE 和 spring cloud Hoxton.SR1
感谢任何帮助。
我刚刚复制了您的属性,对我来说效果很好...
GenericMessage [payload=foo, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@55d4844d, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=MyInputTopic, kafka_receivedTimestamp=1589488691039, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = MyInputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1589488691039, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@572887c3), contentType=application/json, kafka_groupId=myConsumerGroup}]
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(Message<String> in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("MyInputTopic", "foo".getBytes());
};
}
}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.input.destination=MyInputTopic
spring.cloud.stream.bindings.input.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
我找到了为什么我的消费者没有按预期工作:
在我的配置中,我有类似 spring.cloud.stream.bindings. mytopic.destination=MyInputTopic
的东西,但是,流绑定是这样完成的:
@StreamListener("Mytopic")
显然,前缀为 spring.cloud.stream.bindings
的配置不区分大小写(因为所有配置都按预期工作),但前缀为 spring.cloud.stream.kafka.bindings
的配置区分大小写,这导致了我的问题。
我有一个应用程序,我想在其中手动执行 Kafka 消息中的 (n)ack。根据 spring 云文档,应该用 autoCommitOffset
spring cloud documentation
然而,在我的应用程序中,即使定义这样的 属性 header KafkaHeaders.ACKNOWLEDGMENT
仍然是空的。
我的配置是这样的
spring.cloud.stream.kafka.binder.brokers=${KAFKA_BROKER_LIST}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.mytopic.destination=MyInputTopic
spring.cloud.stream.bindings.mytopic.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.mytopic.consumer.autoCommitOffset=false
我的消费者:
@StreamListener("myTopic")
public void consume(@NotNull @Valid Message<MyTopic> message) {
MyTopic payload = message.getPayload();
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); // always null
}
我正在使用 java 13 和 spring boot 2.2.5.RELEASE 和 spring cloud Hoxton.SR1
感谢任何帮助。
我刚刚复制了您的属性,对我来说效果很好...
GenericMessage [payload=foo, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@55d4844d, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=MyInputTopic, kafka_receivedTimestamp=1589488691039, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = MyInputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1589488691039, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@572887c3), contentType=application/json, kafka_groupId=myConsumerGroup}]
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(Message<String> in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("MyInputTopic", "foo".getBytes());
};
}
}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.input.destination=MyInputTopic
spring.cloud.stream.bindings.input.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
我找到了为什么我的消费者没有按预期工作:
在我的配置中,我有类似 spring.cloud.stream.bindings. mytopic.destination=MyInputTopic
的东西,但是,流绑定是这样完成的:
@StreamListener("Mytopic")
显然,前缀为 spring.cloud.stream.bindings
的配置不区分大小写(因为所有配置都按预期工作),但前缀为 spring.cloud.stream.kafka.bindings
的配置区分大小写,这导致了我的问题。