Spring-Kafka 中的 poll() 和 commitAsync()

poll() and commitAsync() in Spring-Kafka

我正在尝试在 java 中编写一个 kafka 消费者应用程序 Spring引导平台。早些时候,我用普通 java 编写了代码,但是 现在转换为 spring-kafka,因为它可以提供一些优势 普通 java。我确实有几个问题想弄明白。

KafkaConsumer.java :

@Component
public class KafkaConsumer {
    
    @KafkaListener(topics = "topic", groupId = "group")
    public void run(ConsumerRecord<String, GenericRecord> record) {
        
        System.out.println(record);
        
    <-- how to asyncCommit()--> 
    }

}

监听器容器会在监听器正常退出时提交偏移量,具体取决于容器的AckMode 属性; AckMode.BATCH(默认)表示轮询返回的所有记录的偏移量将在所有处理完成后提交,AckMode.RECORD表示每个偏移量将在侦听器退出后立即提交。

sync比。 asyncsyncCommits 容器控制 属性。

首先,我建议您使用 Spring kafka 设置的属性和自动配置,而不是创建自己的,因为它遵循 DRY 原则:不要重复自己。

spring:
  kafka:
    bootstrap-servers: ${app.kafka_brokers}
    consumer:
      auto-offset-reset: ${app.offset_reset}
      enable-auto-commit: false   // <---- disable auto committing
    ssl:
      protocol: ${app.security.protocol}
      key-store-location: ${app.ssl.keystore.location}
      key-store-password:  ${app.ssl.keystore.password}
      trust-store-location: ${app.ssl.truststore.location}
      trust-store-password: ${app.ssl.truststore.password}
  // And other properties
    listener:
      ack-mode: manual // This is what you need

AckMode 文档:https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html

本质上,manual是一个异步确认,而manual_immediate是同步的。

然后在你的 @KafkaListener 组件中你可以注入 org.springframework.kafka.support.Acknowledgment 对象确认你的消息。

@Component
public class KafkaConsumer {
    
    @KafkaListener(topics = "topic", groupId = "group")
    public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgment) {
        
        System.out.println(record);
        
        acknowledgment.acknowledge();
    }

}

这是关于可以注入到 @KafkaListener 方法中的内容的文档:https://docs.spring.io/spring-kafka/reference/html/#message-listeners