无法使用传入消息调用侦听器方法,并且 Backoff none 已用完 ConsumerRecord
Listener method could not be invoked with the incoming message and Backoff none exhausted for ConsumerRecord
下面是我对 kafka 侦听器的方法定义,如果收到有效负载的空字符串或空字符串,我想我遇到了以下错误...你能帮忙吗。
@KafkaListener(topics = "${kafka.consumer-topic-name.reservation}", groupId = "${kafka.consumer-group-id.test}",
containerFactory = "kafkaListenerContainerFactory",autoStartup = "${kafka.auto-start.consumer.tets}")
public void consumeReservation(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String kafkaKey) {}
[org.springframework.kafka.KafkaListenerEndpointContainer #0-0-C-1] ERROR o.s.k.l.SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(topic = test_topic, partition = 0, leaderEpoch = 2, offset = 453473, CreateTime = 1601962346576, serialized key size = 41, serialized value size = -1, headers = RecordHeaders(headers = [RecordHeader(key = OPERATION, value = [68, 69, 76, 69, 84, 69]), RecordHeader(key = __Key_TypeId__, value = [99, 108, 75, 101, 121])], isReadOnly = false), key = {
"orgId": "1",
"orderId": "U4000024004"}, value = null)
org.springframework.kafka.listener.ListenerExecutionFailedException: 传入消息无法调用侦听器方法
端点处理程序详细信息:
方法[public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String)]
豆[com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer@731702d1];
嵌套异常是 org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:无法在 public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String) 中解析索引 0 处的方法参数:1错误:[对象错误 'payload':代码[];参数[];
默认消息[Payload值不能为空]
], failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = {
Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, 操作 = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = {
"组织ID": "1",
“orderId”:“U4000024004”
}, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id
}];嵌套异常是 org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: 无法在 public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String 中解析索引 0 处的方法参数): 1 个错误: [对象错误 'payload': codes[];arguments[];
默认消息[Payload值不能为空]
],
failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = {
Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, 操作 = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = {
"组织ID": "1",
“orderId”:“U4000024004”
}, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id
}]
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java: 1925)
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java: 1913)
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1812)
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java: 1739)
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java: 1636)
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java: 1366)
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java: 1082)
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java: 990)
在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: 511)
在 java.util.concurrent.FutureTask.run(FutureTask.java: 266)
在 java.lang.Thread.run(Thread.java: 748)
原因:org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:无法解析 public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String) 中索引 0 处的方法参数:1错误:[对象错误 'payload':代码[];参数[];
默认消息[Payload值不能为空]
]
在 org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java: 122)
在 org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java: 901)
在 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java: 117)
在 org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java: 148)
在 org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java: 116)
在 org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java: 48)
在 org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java: 329)
在 org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(记录MessagingMessageListenerAdapter.java:86)
在 org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(记录MessagingMessageListenerAdapter.java:51)
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java: 1880)
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java: 1862)
在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1799)
...8个常见框架省略
您需要指定不需要负载。
@Payload(required = false) String payload, ...
下面是我对 kafka 侦听器的方法定义,如果收到有效负载的空字符串或空字符串,我想我遇到了以下错误...你能帮忙吗。
@KafkaListener(topics = "${kafka.consumer-topic-name.reservation}", groupId = "${kafka.consumer-group-id.test}",
containerFactory = "kafkaListenerContainerFactory",autoStartup = "${kafka.auto-start.consumer.tets}")
public void consumeReservation(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String kafkaKey) {}
[org.springframework.kafka.KafkaListenerEndpointContainer #0-0-C-1] ERROR o.s.k.l.SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(topic = test_topic, partition = 0, leaderEpoch = 2, offset = 453473, CreateTime = 1601962346576, serialized key size = 41, serialized value size = -1, headers = RecordHeaders(headers = [RecordHeader(key = OPERATION, value = [68, 69, 76, 69, 84, 69]), RecordHeader(key = __Key_TypeId__, value = [99, 108, 75, 101, 121])], isReadOnly = false), key = {
"orgId": "1",
"orderId": "U4000024004"}, value = null)
org.springframework.kafka.listener.ListenerExecutionFailedException: 传入消息无法调用侦听器方法 端点处理程序详细信息: 方法[public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String)] 豆[com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer@731702d1]; 嵌套异常是 org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:无法在 public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String) 中解析索引 0 处的方法参数:1错误:[对象错误 'payload':代码[];参数[]; 默认消息[Payload值不能为空] ], failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = { Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, 操作 = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = { "组织ID": "1", “orderId”:“U4000024004” }, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id }];嵌套异常是 org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: 无法在 public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String 中解析索引 0 处的方法参数): 1 个错误: [对象错误 'payload': codes[];arguments[]; 默认消息[Payload值不能为空] ], failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = { Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, 操作 = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = { "组织ID": "1", “orderId”:“U4000024004” }, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id }] 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java: 1925) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java: 1913) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1812) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java: 1739) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java: 1636) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java: 1366) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java: 1082) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java: 990) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: 511) 在 java.util.concurrent.FutureTask.run(FutureTask.java: 266) 在 java.lang.Thread.run(Thread.java: 748) 原因:org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:无法解析 public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String) 中索引 0 处的方法参数:1错误:[对象错误 'payload':代码[];参数[]; 默认消息[Payload值不能为空] ] 在 org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java: 122) 在 org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java: 901) 在 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java: 117) 在 org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java: 148) 在 org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java: 116) 在 org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java: 48) 在 org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java: 329) 在 org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(记录MessagingMessageListenerAdapter.java:86) 在 org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(记录MessagingMessageListenerAdapter.java:51) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java: 1880) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java: 1862) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1799) ...8个常见框架省略
您需要指定不需要负载。
@Payload(required = false) String payload, ...