将确认传递给 spring KafkaListener 消费者方法
Passing Acknowledgment to a spring KafkaListener consumer method
我正在尝试关闭 kafka 中的自动提交,而是手动执行。为此,在我的 application.properties
中我设置了 spring.kafka.properties.enable.auto.commit=false
我目前还有一个方法 header:
@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.PPA_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
@Headers MessageHeaders headers)
我的理解是,为了手动提交,我需要访问 Acknowledgement
object,它将作为参数传递给我的 receive()
方法。我的问题:如果我将 header 更改为
@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.APP_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
@Headers MessageHeaders headers,
Acknowledgment acknowledgment)
Acknowledgment
会自动传入,还是我需要进行其他更改?
是的,这样一个 Acknowledgment
实例将被传递到您的侦听器方法中。成功处理收到的消息后,您应该调用 acknowledgement.acknowledge();
(仅当您想手动确认时才需要)
我也会切换到 MANUAL
ackmode 并关闭自动提交(你已经做了),例如通过提供自定义 Spring 启动配置 class - 也许也可以通过 application.properties
配置:
@Configuration
class KafkaConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
final Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
consumerProperties.put(ENABLE_AUTO_COMMIT_CONFIG, false);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
}
}
如果您不想手动确认,那么不同的确认模式可能更方便且更适合:
AckMode.RECORD
很舒服,因为如果您的侦听器的方法实现成功完成(没有抛出异常),传递到您的侦听器方法的 Kafka 记录将自动被确认。
简答,不需要传递Acknowledgment
,它是收到消息的一部分。
When using manual AckMode, you can also provide the listener with the Acknowledgment. The following example also shows how to use a different container factory.
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
来自 @KafkaListener 文档:
允许带注释的方法具有类似于 MessageMapping 提供的灵活签名,即
- ConsumerRecord 访问原始 Kafka 消息
- 确认手动确认
- @Payload-annotated 方法参数包括验证支持
- @Header-annotated 方法参数提取特定的 header 值,由 KafkaHeaders
定义
- @Headers-annotated 参数也必须可分配给 Map 以访问所有 headers.
- 用于访问所有 headers 的 MessageHeaders 参数。
- 用于方便访问所有方法参数的 MessageHeaderAccessor。
如果我们想通过方法注入传入一个 spring 托管 bean 怎么办?我试图通过方法注入将服务 class 传递给 kafka 侦听器 -
private fun defaultListener(payload: ByteArray, @Headers messageHeaders: MessageHeaders, ack: Acknowledgment, callbackService: CallbackService) {
// Do something
}
我得到以下异常 -
org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.test.kafka-demo.service.CallbackService]
如果我使依赖服务 class 自动连接,工作正常。
我正在尝试关闭 kafka 中的自动提交,而是手动执行。为此,在我的 application.properties
中我设置了 spring.kafka.properties.enable.auto.commit=false
我目前还有一个方法 header:
@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.PPA_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
@Headers MessageHeaders headers)
我的理解是,为了手动提交,我需要访问 Acknowledgement
object,它将作为参数传递给我的 receive()
方法。我的问题:如果我将 header 更改为
@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.APP_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
@Headers MessageHeaders headers,
Acknowledgment acknowledgment)
Acknowledgment
会自动传入,还是我需要进行其他更改?
是的,这样一个 Acknowledgment
实例将被传递到您的侦听器方法中。成功处理收到的消息后,您应该调用 acknowledgement.acknowledge();
(仅当您想手动确认时才需要)
我也会切换到 MANUAL
ackmode 并关闭自动提交(你已经做了),例如通过提供自定义 Spring 启动配置 class - 也许也可以通过 application.properties
配置:
@Configuration
class KafkaConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
final Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
consumerProperties.put(ENABLE_AUTO_COMMIT_CONFIG, false);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
}
}
如果您不想手动确认,那么不同的确认模式可能更方便且更适合:
AckMode.RECORD
很舒服,因为如果您的侦听器的方法实现成功完成(没有抛出异常),传递到您的侦听器方法的 Kafka 记录将自动被确认。
简答,不需要传递Acknowledgment
,它是收到消息的一部分。
When using manual AckMode, you can also provide the listener with the Acknowledgment. The following example also shows how to use a different container factory.
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
来自 @KafkaListener 文档:
允许带注释的方法具有类似于 MessageMapping 提供的灵活签名,即
- ConsumerRecord 访问原始 Kafka 消息
- 确认手动确认
- @Payload-annotated 方法参数包括验证支持
- @Header-annotated 方法参数提取特定的 header 值,由 KafkaHeaders 定义
- @Headers-annotated 参数也必须可分配给 Map 以访问所有 headers.
- 用于访问所有 headers 的 MessageHeaders 参数。
- 用于方便访问所有方法参数的 MessageHeaderAccessor。
如果我们想通过方法注入传入一个 spring 托管 bean 怎么办?我试图通过方法注入将服务 class 传递给 kafka 侦听器 -
private fun defaultListener(payload: ByteArray, @Headers messageHeaders: MessageHeaders, ack: Acknowledgment, callbackService: CallbackService) {
// Do something
}
我得到以下异常 -
org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.test.kafka-demo.service.CallbackService]
如果我使依赖服务 class 自动连接,工作正常。