无法在 Kinesis Binder 中以批处理模式使用消息
Unable to consume messages as batch mode in Kinesis Binder
我正在尝试批量处理来自 Kinesis 流的消息
我正在使用
compile('org.springframework.cloud:spring-cloud-starter-stream-kinesis:1.0.0.BUILD-SNAPSHOT')
Application.yml
spring:
cloud:
stream:
bindings:
input:
group: groupName
destination: stream-name
content-type: application/json
consumer:
listenerMode: batch
idleBetweenPolls: 10000
代码
根据文档,当 listenerMode 为批处理时,它应该将列表作为有效负载
@StreamListener(Sink.INPUT)
public void message(List<String> messages) {
System.out.println(messages);
}
我已将消息发送为 {"ab":"aa"}
我收到消息错误
org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access00(KinesisMessageDrivenChannelAdapter.java:86) ~[spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.performSend(KinesisMessageDrivenChannelAdapter.java:1010) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processRecords(KinesisMessageDrivenChannelAdapter.java:980) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.lambda$processTask(KinesisMessageDrivenChannelAdapter.java:859) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1078) ~[spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:124) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:85) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:70) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:137) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
... 18 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1342) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1138) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1092) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.handleNonArray(StringCollectionDeserializer.java:266) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:179) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:169) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:21) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4001) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3109) ~[jackson-databind-2.9.4.jar:2.9.4]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:114) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
... 29 common frames omitted
你是mis-reading文档;由于这是一个 kinesis-specific 属性,您需要在 YAML 中添加一个 kinesis:
元素。
The following properties are available for Kinesis consumers only and must be prefixed with spring.cloud.stream.kinesis.bindings.<channelName>.consumer.
我正在尝试批量处理来自 Kinesis 流的消息
我正在使用
compile('org.springframework.cloud:spring-cloud-starter-stream-kinesis:1.0.0.BUILD-SNAPSHOT')
Application.yml
spring:
cloud:
stream:
bindings:
input:
group: groupName
destination: stream-name
content-type: application/json
consumer:
listenerMode: batch
idleBetweenPolls: 10000
代码
根据文档,当 listenerMode 为批处理时,它应该将列表作为有效负载
@StreamListener(Sink.INPUT)
public void message(List<String> messages) {
System.out.println(messages);
}
我已将消息发送为 {"ab":"aa"}
我收到消息错误
org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access00(KinesisMessageDrivenChannelAdapter.java:86) ~[spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.performSend(KinesisMessageDrivenChannelAdapter.java:1010) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processRecords(KinesisMessageDrivenChannelAdapter.java:980) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.lambda$processTask(KinesisMessageDrivenChannelAdapter.java:859) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1078) ~[spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:124) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:85) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:70) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:137) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
... 18 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1342) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1138) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1092) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.handleNonArray(StringCollectionDeserializer.java:266) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:179) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:169) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:21) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4001) ~[jackson-databind-2.9.4.jar:2.9.4]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3109) ~[jackson-databind-2.9.4.jar:2.9.4]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:114) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
... 29 common frames omitted
你是mis-reading文档;由于这是一个 kinesis-specific 属性,您需要在 YAML 中添加一个 kinesis:
元素。
The following properties are available for Kinesis consumers only and must be prefixed with
spring.cloud.stream.kinesis.bindings.<channelName>.consumer.