Acknowledgement.acknowledge() 在 spring-kafka @KafkaListener 中抛出异常
Acknowledgement.acknowledge() throwing exception in spring-kafka @KafkaListener
当我将 enable.auto.commit 设置为 false 并尝试使用基于注释的 spring-kafka @KafkaListener 手动提交偏移量时,我得到一个 org.springframework.kafka.listener.ListenerExecutionFailedException: Listener 方法无法使用传入消息调用
我有一个非常简单的代码如下:
@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
System.out.println("Received Messasge in group 'foo': " + message);
// TODO: Do something with the message
}
当我从生产者发送消息时,出现以下异常:
org.springframework.kafka.listener.ListenerExecutionFailedException: 传入消息无法调用侦听器方法。
端点处理程序详细信息:
方法[public void com.****.*****.*******.KafkaMessageListener.listenFooGroup(java.lang.String,org.springframework.kafka.support.Acknowledgment)]
Bean [com.****.*****.*******.KafkaMessageListener@5856dbe4]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}],
failedMessage=GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}]
请帮忙。 TIA.
您必须将容器工厂的 containerProperties
ackMode 设置为 MANUAL
或 MANUAL_IMMEDIATE
才能获得 Acknowledgment
对象。
对于其他 ack 模式,容器负责提交偏移量。
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE)
或者设置 ....ackMode
属性 如果使用 Spring Boot
当我将 enable.auto.commit 设置为 false 并尝试使用基于注释的 spring-kafka @KafkaListener 手动提交偏移量时,我得到一个 org.springframework.kafka.listener.ListenerExecutionFailedException: Listener 方法无法使用传入消息调用
我有一个非常简单的代码如下:
@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
System.out.println("Received Messasge in group 'foo': " + message);
// TODO: Do something with the message
}
当我从生产者发送消息时,出现以下异常:
org.springframework.kafka.listener.ListenerExecutionFailedException: 传入消息无法调用侦听器方法。
端点处理程序详细信息:
方法[public void com.****.*****.*******.KafkaMessageListener.listenFooGroup(java.lang.String,org.springframework.kafka.support.Acknowledgment)]
Bean [com.****.*****.*******.KafkaMessageListener@5856dbe4]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}], failedMessage=GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}]
请帮忙。 TIA.
您必须将容器工厂的 containerProperties
ackMode 设置为 MANUAL
或 MANUAL_IMMEDIATE
才能获得 Acknowledgment
对象。
对于其他 ack 模式,容器负责提交偏移量。
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE)
或者设置 ....ackMode
属性 如果使用 Spring Boot