Spring 云流 - kafka - 空确认 Header

Spring cloud Stream - kafka - Null Acknowledgement Header

我想使用 spring 云流手动提交偏移量 - 仅当消息处理成功时。 这是我的代码 - application.yml & Handler Class

      public void process(Message<?> message) {
         System.out.println(message.getPayload());
         Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
           System.out.println("Acknowledgment provided");
           acknowledgment.acknowledge();
        }
    }
---------------------------------------------------------------------------------
spring:
  application:
      name: springCloud
  cloud:
      stream:
          default-binder: kafka
          kafka:
              bindings:
                  myChannel:
                      consumer:
                          autoCommitOffset: false

但我的确认 object 为空,因为 header object 'kafka_acknowledgement' 本身不存在。

  1. 如何获得确认object?
  2. 我的要求是只有在处理成功时才提交偏移量,如果处理失败我不想从通道中弹出消息以便稍后读取。 上面的代码是否足以实现这一点?

您使用的是什么版本?

在 3.1 中,autoCommitOffset 被弃用,取而代之的是设置 ackMode(在本例中为 manual);但是,看起来 autoCommitOffset 现在已被完全忽略而不是弃用。

使用yaml文件时,请使用属性 'auto-commit-offset'.