将确认传递给 spring KafkaListener 消费者方法

Passing Acknowledgment to a spring KafkaListener consumer method

我正在尝试关闭 kafka 中的自动提交,而是手动执行。为此,在我的 application.properties 中我设置了 spring.kafka.properties.enable.auto.commit=false

我目前还有一个方法 header:

@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.PPA_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
                    @Headers MessageHeaders headers)

我的理解是,为了手动提交,我需要访问 Acknowledgement object,它将作为参数传递给我的 receive() 方法。我的问题:如果我将 header 更改为

@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.APP_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
                    @Headers MessageHeaders headers,
                    Acknowledgment acknowledgment)

Acknowledgment 会自动传入,还是我需要进行其他更改?

是的,这样一个 Acknowledgment 实例将被传递到您的侦听器方法中。成功处理收到的消息后,您应该调用 acknowledgement.acknowledge();(仅当您想手动确认时才需要)

我也会切换到 MANUAL ackmode 并关闭自动提交(你已经做了),例如通过提供自定义 Spring 启动配置 class - 也许也可以通过 application.properties 配置:

@Configuration
class KafkaConfiguration {

        @Bean
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

            final Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
            consumerProperties.put(ENABLE_AUTO_COMMIT_CONFIG, false);

            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.getContainerProperties().setAckMode(MANUAL);

            return factory;
        }
    }
}

如果您不想手动确认,那么不同的确认模式可能更方便且更适合:

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html

AckMode.RECORD 很舒服,因为如果您的侦听器的方法实现成功完成(没有抛出异常),传递到您的侦听器方法的 Kafka 记录将自动被确认。

简答,不需要传递Acknowledgment,它是收到消息的一部分。

根据 documentation

When using manual AckMode, you can also provide the listener with the Acknowledgment. The following example also shows how to use a different container factory.

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

来自 @KafkaListener 文档:

允许带注释的方法具有类似于 MessageMapping 提供的灵活签名,即

  • ConsumerRecord 访问原始 Kafka 消息
  • 确认手动确认
  • @Payload-annotated 方法参数包括验证支持
  • @Header-annotated 方法参数提取特定的 header 值,由 KafkaHeaders
  • 定义
  • @Headers-annotated 参数也必须可分配给 Map 以访问所有 headers.
  • 用于访问所有 headers 的 MessageHeaders 参数。
  • 用于方便访问所有方法参数的 MessageHeaderAccessor。

如果我们想通过方法注入传入一个 spring 托管 bean 怎么办?我试图通过方法注入将服务 class 传递给 kafka 侦听器 -

private fun defaultListener(payload: ByteArray, @Headers messageHeaders: MessageHeaders, ack: Acknowledgment, callbackService: CallbackService) {
 // Do something
}

我得到以下异常 -

org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.test.kafka-demo.service.CallbackService]

如果我使依赖服务 class 自动连接,工作正常。