无法在 Kinesis Binder 中以批处理模式使用消息

Unable to consume messages as batch mode in Kinesis Binder

我正在尝试批量处理来自 Kinesis 流的消息

我正在使用

compile('org.springframework.cloud:spring-cloud-starter-stream-kinesis:1.0.0.BUILD-SNAPSHOT')

Application.yml

spring: 
  cloud:
    stream:
      bindings:
        input: 
          group: groupName
          destination: stream-name
          content-type: application/json
          consumer: 
           listenerMode: batch
           idleBetweenPolls: 10000

代码

根据文档,当 listenerMode 为批处理时,它应该将列表作为有效负载

@StreamListener(Sink.INPUT)
public void message(List<String> messages) {
    System.out.println(messages);
}

我已将消息发送为 {"ab":"aa"}

我收到消息错误

org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
 at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
    at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access00(KinesisMessageDrivenChannelAdapter.java:86) ~[spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.performSend(KinesisMessageDrivenChannelAdapter.java:1010) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processRecords(KinesisMessageDrivenChannelAdapter.java:980) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.lambda$processTask(KinesisMessageDrivenChannelAdapter.java:859) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1078) ~[spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_144]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_144]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot parse payload ; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
 at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:124) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:85) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:70) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:137) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    ... 18 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of START_OBJECT token
 at [Source: (byte[])"{"ab":"aa"}"; line: 1, column: 1]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1342) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1138) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1092) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.handleNonArray(StringCollectionDeserializer.java:266) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:179) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:169) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:21) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4001) ~[jackson-databind-2.9.4.jar:2.9.4]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3109) ~[jackson-databind-2.9.4.jar:2.9.4]
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:114) ~[spring-cloud-stream-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    ... 29 common frames omitted

你是mis-reading文档;由于这是一个 kinesis-specific 属性,您需要在 YAML 中添加一个 kinesis: 元素。

https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties

The following properties are available for Kinesis consumers only and must be prefixed with spring.cloud.stream.kinesis.bindings.<channelName>.consumer.