Failed to convert message 异常与 RabbitMQ source + Log sink stream

Failed to convert message exception with RabbitMQ source + Log sink stream

我正在试用 Spring 云数据流,今天我更新到最新版本,因为我无法创建这个应该简单记录 AMQP 消息的简单示例...

rabbit | log

当我部署这个流并简单地在消费队列上发布一个字符串消息时,这工作正常。但当它是一个序列化的 PoJo 时,它就不会。数据流服务器的旧版本 + 基于 spring 启动 1.5.x 启动的应用程序就是这样做的。

Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"absolute_path":"/+~JF4472914347363856925.tmp","filename":"+~JF4472914347363856925.tmp","timestamp":1536315010932,"sshd_server":"localhost","sshd_port":22}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=sftp.uploaded, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_consumerQueue=sftp_uploaded, amqp_redelivered=false, id=d3d84d90-53ca-4c39-cdef-8665d35ddcf1, amqp_consumerTag=amq.ctag-8kP4KDjn13oae1Qutmw4IA, contentType=text/json, timestamp=1536315013950}]' to outbound message.
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access0(AmqpInboundChannelAdapter.java:60) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:240) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:207) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
    ... 22 common frames omitted
Caused by: java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"absolute_path":"/+~JF4472914347363856925.tmp","filename":"+~JF4472914347363856925.tmp","timestamp":1536315010932,"sshd_server":"localhost","sshd_port":22}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=sftp.uploaded, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_consumerQueue=sftp_uploaded, amqp_redelivered=false, id=d3d84d90-53ca-4c39-cdef-8665d35ddcf1, amqp_consumerTag=amq.ctag-8kP4KDjn13oae1Qutmw4IA, contentType=text/json, timestamp=1536315013950}]' to outbound message.
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:324) ~[spring-cloud-stream-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:351) ~[spring-cloud-stream-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:589) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:435) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    ... 32 common frames omitted

版本

Spring Cloud Stream 2.* 中的内容类型协商发生了重大变化和增强。您可以在此处阅读 https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#content-type-management。 基本上我所看到的是堆栈中没有合适的 MessageConverter。 另外,据我所知,您的内容类型是 text/json,我们没有为其提供转换器。考虑将其更改为 application/json.