无法使用传入消息调用侦听器方法,并且 Backoff none 已用完 ConsumerRecord

Listener method could not be invoked with the incoming message and Backoff none exhausted for ConsumerRecord

下面是我对 kafka 侦听器的方法定义,如果收到有效负载的空字符串或空字符串,我想我遇到了以下错误...你能帮忙吗。

 @KafkaListener(topics = "${kafka.consumer-topic-name.reservation}", groupId = "${kafka.consumer-group-id.test}",
        containerFactory = "kafkaListenerContainerFactory",autoStartup = "${kafka.auto-start.consumer.tets}")
public void consumeReservation(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                               @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String kafkaKey) {}

[org.springframework.kafka.KafkaListenerEndpointContainer #0-0-C-1] ERROR o.s.k.l.SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(topic = test_topic, partition = 0, leaderEpoch = 2, offset = 453473, CreateTime = 1601962346576, serialized key size = 41, serialized value size = -1, headers = RecordHeaders(headers = [RecordHeader(key = OPERATION, value = [68, 69, 76, 69, 84, 69]), RecordHeader(key = __Key_TypeId__, value = [99, 108, 75, 101, 121])], isReadOnly = false), key = {
"orgId": "1",
"orderId": "U4000024004"}, value = null)

org.springframework.kafka.listener.ListenerExecutionFailedException: 传入消息无法调用侦听器方法 端点处理程序详细信息: 方法[public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String)] 豆[com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer@731702d1]; 嵌套异常是 org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:无法在 public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String) 中解析索引 0 处的方法参数:1错误:[对象错误 'payload':代码[];参数[]; 默认消息[Payload值不能为空] ], failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = { Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, 操作 = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = { "组织ID": "1", “orderId”:“U4000024004” }, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id }];嵌套异常是 org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: 无法在 public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String 中解析索引 0 处的方法参数): 1 个错误: [对象错误 'payload': codes[];arguments[]; 默认消息[Payload值不能为空] ], failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = { Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, 操作 = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = { "组织ID": "1", “orderId”:“U4000024004” }, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id }] 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java: 1925) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java: 1913) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1812) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java: 1739) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java: 1636) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java: 1366) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java: 1082) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java: 990) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: 511) 在 java.util.concurrent.FutureTask.run(FutureTask.java: 266) 在 java.lang.Thread.run(Thread.java: 748) 原因:org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:无法解析 public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String) 中索引 0 处的方法参数:1错误:[对象错误 'payload':代码[];参数[]; 默认消息[Payload值不能为空] ] 在 org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java: 122) 在 org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java: 901) 在 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java: 117) 在 org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java: 148) 在 org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java: 116) 在 org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java: 48) 在 org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java: 329) 在 org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(记录MessagingMessageListenerAdapter.java:86) 在 org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(记录MessagingMessageListenerAdapter.java:51) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java: 1880) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java: 1862) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1799) ...8个常见框架省略

您需要指定不需要负载。

@Payload(required = false) String payload, ...