使用 spring 云流在 kafka 中自动提交

Auto commit in kafka with spring cloud stream

我有一个应用程序,我想在其中手动执行 Kafka 消息中的 (n)ack。根据 spring 云文档,应该用 autoCommitOffset spring cloud documentation

完成

然而,在我的应用程序中,即使定义这样的 属性 header KafkaHeaders.ACKNOWLEDGMENT 仍然是空的。

我的配置是这样的

spring.cloud.stream.kafka.binder.brokers=${KAFKA_BROKER_LIST}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.mytopic.destination=MyInputTopic
spring.cloud.stream.bindings.mytopic.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.mytopic.consumer.autoCommitOffset=false

我的消费者:

@StreamListener("myTopic")
public void consume(@NotNull @Valid Message<MyTopic> message) {
    MyTopic payload = message.getPayload();
    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); // always null
}

我正在使用 java 13 和 spring boot 2.2.5.RELEASE 和 spring cloud Hoxton.SR1

感谢任何帮助。

我刚刚复制了您的属性,对我来说效果很好...

GenericMessage [payload=foo, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@55d4844d, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=MyInputTopic, kafka_receivedTimestamp=1589488691039, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = MyInputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1589488691039, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@572887c3), contentType=application/json, kafka_groupId=myConsumerGroup}]

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(Message<String> in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("MyInputTopic", "foo".getBytes());
        };
    }

}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.input.destination=MyInputTopic
spring.cloud.stream.bindings.input.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false

我找到了为什么我的消费者没有按预期工作:

在我的配置中,我有类似 spring.cloud.stream.bindings. mytopic.destination=MyInputTopic 的东西,但是,流绑定是这样完成的:

@StreamListener("Mytopic")

显然,前缀为 spring.cloud.stream.bindings 的配置不区分大小写(因为所有配置都按预期工作),但前缀为 spring.cloud.stream.kafka.bindings 的配置区分大小写,这导致了我的问题。