Kafka 消费者手册提交偏移量
Kafka consumer manual commit offset
我正在实施和 dsl spring 从 Kafka 获取消息的集成流程
代码片段:
return IntegrationFlows.from(
Kafka.messageDrivenChannelAdapter(new DefaultKafkaConsumerFactory(kafkaTelemetryDataConsumerConfiguration.getConsumerProperties()),
kafkaPropertiesConfiguration.getTelemetryDataTopic()))
})
.handle(bla.someImportantOperation())
//TODO:do manual commit here
//.handle(consumer.commitSync())
.get();
我想知道如何手动 commitSync 但只有在 .handle(bla.someImportantOperation())
成功完成之后。
我不知道如何获取消费者参考,因为我使用的是 DefaultKafkaConsumerFactory,希望得到任何帮助。
这些是我用来创建消费者的 consumerProperties:
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropertiesConfiguration.getBootstrapServers());
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, kafkaPropertiesConfiguration.getClientIdConfig());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConfiguration.getGroupIdConfig());
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Kafka.messageDrivenChannelAdapter()
为您提供了一个配置器挂钩:
.configureListenerContainer(c ->
c.ackMode(ContainerProperties.AckMode.MANUAL))
注意我提供的选项。
阅读其 Javadoc,然后 AcknowledgingMessageListener
。
提到了 Acknowledgment
。这个通过 KafkaHeaders.ACKNOWLEDGMENT
.
出现在消息 headers 中
所以,您 //.handle(consumer.commitSync())
中需要的就是这样的东西:
.handle(m -> headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge())
在 Spring 中查看有关 Apache Kafka 文档的更多信息:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#committing-offsets
我正在实施和 dsl spring 从 Kafka 获取消息的集成流程
代码片段:
return IntegrationFlows.from(
Kafka.messageDrivenChannelAdapter(new DefaultKafkaConsumerFactory(kafkaTelemetryDataConsumerConfiguration.getConsumerProperties()),
kafkaPropertiesConfiguration.getTelemetryDataTopic()))
})
.handle(bla.someImportantOperation())
//TODO:do manual commit here
//.handle(consumer.commitSync())
.get();
我想知道如何手动 commitSync 但只有在 .handle(bla.someImportantOperation())
成功完成之后。
我不知道如何获取消费者参考,因为我使用的是 DefaultKafkaConsumerFactory,希望得到任何帮助。
这些是我用来创建消费者的 consumerProperties:
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropertiesConfiguration.getBootstrapServers());
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, kafkaPropertiesConfiguration.getClientIdConfig());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConfiguration.getGroupIdConfig());
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Kafka.messageDrivenChannelAdapter()
为您提供了一个配置器挂钩:
.configureListenerContainer(c ->
c.ackMode(ContainerProperties.AckMode.MANUAL))
注意我提供的选项。
阅读其 Javadoc,然后 AcknowledgingMessageListener
。
提到了 Acknowledgment
。这个通过 KafkaHeaders.ACKNOWLEDGMENT
.
所以,您 //.handle(consumer.commitSync())
中需要的就是这样的东西:
.handle(m -> headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge())
在 Spring 中查看有关 Apache Kafka 文档的更多信息:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#committing-offsets