contentType 从 AMQP 到 STOMP 消息时遇到问题

Having trouble with contentType going from AMQP to STOMP message

这是我的(总结)

<int-amqp:inbound-channel-adapter 
    auto-startup="false" 
    listener-container="clientListenerContainer" 
    channel="fromRabbitClientChannel" 
    connection-factory="rabbitConnectionFactory" />

<bean id="clientListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="rabbitConnectionFactory" />
</bean>

<int:service-activator ref="subscriptionHandler" method="convertMessage" input-channel="fromRabbitClientChannel" output-channel="sendMessage" />

<bean id="stompSubProtocolHandler" class="org.springframework.web.socket.messaging.StompSubProtocolHandler"/>

<int-websocket:outbound-channel-adapter channel="sendMessage" 
                                            container="serverWebSocketContainer"
                                            default-protocol-handler="stompSubProtocolHandler"/>

这是我的 subscriptionHandler convertMessage 函数

public Message convertMessage(Message message){
    String subscriptionQuery = AmqpMessageHeaderAccessor.wrap(message).getReceivedRoutingKey();
    ClientSubscription subscription = this.subscriptions.getBySubscriptionQuery(subscriptionQuery);

    Message msg = MessageBuilder.withPayload(message.getPayload())
            .setHeader(SimpMessageHeaderAccessor.SESSION_ID_HEADER, subscription.getSessionId())
            .setHeader(SimpMessageHeaderAccessor.DESTINATION_HEADER, subscription.getSubscriptionQuery())
            .setHeader(SimpMessageHeaderAccessor.SUBSCRIPTION_ID_HEADER, subscription.getSubscriptionId()).build();
    return msg;
}

当消息通过 <int-amqp:inbound-channel-adapter>SimpleMessageListenerContainer 传入时,对于我的“”,有效载荷是一个字节 [] 并且 contentType header 设置为 application/octet-stream.

然后我的 convertMessage 函数使用一些 STOMP headers 和 returns 从负载创建一条消息到 stompSubProtocolHandler 接管的 sendMessage 通道。它在那里,当它试图将 headers 转换为 stomp 消息时,即抛出异常 java.lang.String cannot be cast to org.springframework.util.MimeType.

这是堆栈跟踪。

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.util.MimeType
    at org.springframework.messaging.support.MessageHeaderAccessor.getContentType(MessageHeaderAccessor.java:457)
    at org.springframework.messaging.simp.stomp.StompHeaderAccessor.updateStompHeadersFromSimpMessageHeaders(StompHeaderAccessor.java:169)
    at org.springframework.messaging.simp.stomp.StompHeaderAccessor.<init>(StompHeaderAccessor.java:127)
    at org.springframework.messaging.simp.stomp.StompHeaderAccessor.wrap(StompHeaderAccessor.java:478)
    at org.springframework.web.socket.messaging.StompSubProtocolHandler.getStompHeaderAccessor(StompSubProtocolHandler.java:402)
    at org.springframework.web.socket.messaging.StompSubProtocolHandler.handleMessageToClient(StompSubProtocolHandler.java:329)
    at org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler.handleMessageInternal(WebSocketOutboundMessageHandler.java:151)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)

我觉得我没有添加我需要的 header,或者我可能需要使用消息转换器...或者我可能只是没有一起做对。

我需要对消息做些什么来为 stompSubProtocolHandler 做准备?

我是否通过使用带有 SimpleMessageListenerContainer<int-amqp:inbound-channel-adapter> 正确处理来自 AMQP 的消息? (我需要容器在进行更多订阅时动态添加更多 queues)

干得好!

您是否介意在准备就绪时将其作为示例分享给社区? 作为此项目的 PullRequest:https://github.com/spring-projects/spring-integration-samples.

因为你的 StackTrace 说 contentTypeString,你可以在你的 subscriptionHandler:

中覆盖它
AmqpMessageHeaderAccessor amqpMessageHeaderAccessor = AmqpMessageHeaderAccessor.wrap(message);
....
.setHeader(MessageHeaders.CONTENT_TYPE, amqpMessageHeaderAccessor.getContentType())

更新

顺便说一句,我们最近修复了 contentType 歧义:https://jira.spring.io/browse/SPR-12730。使用最新的 SF 4.1.5,您无需再关心 contentType 转换。它在框架中自动完成。