contentType 从 AMQP 到 STOMP 消息时遇到问题
Having trouble with contentType going from AMQP to STOMP message
这是我的(总结)
<int-amqp:inbound-channel-adapter
auto-startup="false"
listener-container="clientListenerContainer"
channel="fromRabbitClientChannel"
connection-factory="rabbitConnectionFactory" />
<bean id="clientListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="rabbitConnectionFactory" />
</bean>
<int:service-activator ref="subscriptionHandler" method="convertMessage" input-channel="fromRabbitClientChannel" output-channel="sendMessage" />
<bean id="stompSubProtocolHandler" class="org.springframework.web.socket.messaging.StompSubProtocolHandler"/>
<int-websocket:outbound-channel-adapter channel="sendMessage"
container="serverWebSocketContainer"
default-protocol-handler="stompSubProtocolHandler"/>
这是我的 subscriptionHandler convertMessage 函数
public Message convertMessage(Message message){
String subscriptionQuery = AmqpMessageHeaderAccessor.wrap(message).getReceivedRoutingKey();
ClientSubscription subscription = this.subscriptions.getBySubscriptionQuery(subscriptionQuery);
Message msg = MessageBuilder.withPayload(message.getPayload())
.setHeader(SimpMessageHeaderAccessor.SESSION_ID_HEADER, subscription.getSessionId())
.setHeader(SimpMessageHeaderAccessor.DESTINATION_HEADER, subscription.getSubscriptionQuery())
.setHeader(SimpMessageHeaderAccessor.SUBSCRIPTION_ID_HEADER, subscription.getSubscriptionId()).build();
return msg;
}
当消息通过 <int-amqp:inbound-channel-adapter>
、SimpleMessageListenerContainer
传入时,对于我的“”,有效载荷是一个字节 [] 并且 contentType header 设置为 application/octet-stream.
然后我的 convertMessage 函数使用一些 STOMP headers 和 returns 从负载创建一条消息到 stompSubProtocolHandler 接管的 sendMessage 通道。它在那里,当它试图将 headers 转换为 stomp 消息时,即抛出异常 java.lang.String cannot be cast to org.springframework.util.MimeType.
这是堆栈跟踪。
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.util.MimeType
at org.springframework.messaging.support.MessageHeaderAccessor.getContentType(MessageHeaderAccessor.java:457)
at org.springframework.messaging.simp.stomp.StompHeaderAccessor.updateStompHeadersFromSimpMessageHeaders(StompHeaderAccessor.java:169)
at org.springframework.messaging.simp.stomp.StompHeaderAccessor.<init>(StompHeaderAccessor.java:127)
at org.springframework.messaging.simp.stomp.StompHeaderAccessor.wrap(StompHeaderAccessor.java:478)
at org.springframework.web.socket.messaging.StompSubProtocolHandler.getStompHeaderAccessor(StompSubProtocolHandler.java:402)
at org.springframework.web.socket.messaging.StompSubProtocolHandler.handleMessageToClient(StompSubProtocolHandler.java:329)
at org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler.handleMessageInternal(WebSocketOutboundMessageHandler.java:151)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
我觉得我没有添加我需要的 header,或者我可能需要使用消息转换器...或者我可能只是没有一起做对。
我需要对消息做些什么来为 stompSubProtocolHandler 做准备?
我是否通过使用带有 SimpleMessageListenerContainer
的 <int-amqp:inbound-channel-adapter>
正确处理来自 AMQP 的消息? (我需要容器在进行更多订阅时动态添加更多 queues)
干得好!
您是否介意在准备就绪时将其作为示例分享给社区?
作为此项目的 PullRequest:https://github.com/spring-projects/spring-integration-samples.
因为你的 StackTrace 说 contentType
是 String
,你可以在你的 subscriptionHandler
:
中覆盖它
AmqpMessageHeaderAccessor amqpMessageHeaderAccessor = AmqpMessageHeaderAccessor.wrap(message);
....
.setHeader(MessageHeaders.CONTENT_TYPE, amqpMessageHeaderAccessor.getContentType())
更新
顺便说一句,我们最近修复了 contentType
歧义:https://jira.spring.io/browse/SPR-12730。使用最新的 SF 4.1.5
,您无需再关心 contentType
转换。它在框架中自动完成。
这是我的(总结)
<int-amqp:inbound-channel-adapter
auto-startup="false"
listener-container="clientListenerContainer"
channel="fromRabbitClientChannel"
connection-factory="rabbitConnectionFactory" />
<bean id="clientListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="rabbitConnectionFactory" />
</bean>
<int:service-activator ref="subscriptionHandler" method="convertMessage" input-channel="fromRabbitClientChannel" output-channel="sendMessage" />
<bean id="stompSubProtocolHandler" class="org.springframework.web.socket.messaging.StompSubProtocolHandler"/>
<int-websocket:outbound-channel-adapter channel="sendMessage"
container="serverWebSocketContainer"
default-protocol-handler="stompSubProtocolHandler"/>
这是我的 subscriptionHandler convertMessage 函数
public Message convertMessage(Message message){
String subscriptionQuery = AmqpMessageHeaderAccessor.wrap(message).getReceivedRoutingKey();
ClientSubscription subscription = this.subscriptions.getBySubscriptionQuery(subscriptionQuery);
Message msg = MessageBuilder.withPayload(message.getPayload())
.setHeader(SimpMessageHeaderAccessor.SESSION_ID_HEADER, subscription.getSessionId())
.setHeader(SimpMessageHeaderAccessor.DESTINATION_HEADER, subscription.getSubscriptionQuery())
.setHeader(SimpMessageHeaderAccessor.SUBSCRIPTION_ID_HEADER, subscription.getSubscriptionId()).build();
return msg;
}
当消息通过 <int-amqp:inbound-channel-adapter>
、SimpleMessageListenerContainer
传入时,对于我的“”,有效载荷是一个字节 [] 并且 contentType header 设置为 application/octet-stream.
然后我的 convertMessage 函数使用一些 STOMP headers 和 returns 从负载创建一条消息到 stompSubProtocolHandler 接管的 sendMessage 通道。它在那里,当它试图将 headers 转换为 stomp 消息时,即抛出异常 java.lang.String cannot be cast to org.springframework.util.MimeType.
这是堆栈跟踪。
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.util.MimeType
at org.springframework.messaging.support.MessageHeaderAccessor.getContentType(MessageHeaderAccessor.java:457)
at org.springframework.messaging.simp.stomp.StompHeaderAccessor.updateStompHeadersFromSimpMessageHeaders(StompHeaderAccessor.java:169)
at org.springframework.messaging.simp.stomp.StompHeaderAccessor.<init>(StompHeaderAccessor.java:127)
at org.springframework.messaging.simp.stomp.StompHeaderAccessor.wrap(StompHeaderAccessor.java:478)
at org.springframework.web.socket.messaging.StompSubProtocolHandler.getStompHeaderAccessor(StompSubProtocolHandler.java:402)
at org.springframework.web.socket.messaging.StompSubProtocolHandler.handleMessageToClient(StompSubProtocolHandler.java:329)
at org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler.handleMessageInternal(WebSocketOutboundMessageHandler.java:151)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
我觉得我没有添加我需要的 header,或者我可能需要使用消息转换器...或者我可能只是没有一起做对。
我需要对消息做些什么来为 stompSubProtocolHandler 做准备?
我是否通过使用带有 SimpleMessageListenerContainer
的 <int-amqp:inbound-channel-adapter>
正确处理来自 AMQP 的消息? (我需要容器在进行更多订阅时动态添加更多 queues)
干得好!
您是否介意在准备就绪时将其作为示例分享给社区? 作为此项目的 PullRequest:https://github.com/spring-projects/spring-integration-samples.
因为你的 StackTrace 说 contentType
是 String
,你可以在你的 subscriptionHandler
:
AmqpMessageHeaderAccessor amqpMessageHeaderAccessor = AmqpMessageHeaderAccessor.wrap(message);
....
.setHeader(MessageHeaders.CONTENT_TYPE, amqpMessageHeaderAccessor.getContentType())
更新
顺便说一句,我们最近修复了 contentType
歧义:https://jira.spring.io/browse/SPR-12730。使用最新的 SF 4.1.5
,您无需再关心 contentType
转换。它在框架中自动完成。