如何将 object `Message` 传递给路由?
How can I pass an object `Message` to the route?
我创建了一个流,它使用来自 RabbitMQ 的消息,然后使用路由器按类型分发到适当的服务。
服务中的方法采用参数 Message<?>
,因为我需要在那里使用 headers。但是在这种方法中,我只收到类型为 java.lang.String
而不是 org.springframework.messaging.Message
的消息有效负载,并且
我收到错误 java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
.
Payload 不适合我,因为我需要从消息中获取headers。
@Bean
public IntegrationFlow testFlow(String queueName,
ConnectionFactory connectionFactory,
Service1 service1,
Service2 service2) {
SimpleMessageListenerContainer consumerListener = new SimpleMessageListenerContainer(connectionFactory);
consumerListener.addQueueNames(queueName);
return IntegrationFlows.from(Amqp.inboundAdapter(consumerListener))
.transform(s -> s, ConsumerEndpointSpec::transactional)
.<Message<?>, String>route(HeadersUtil::getType, m -> m
.subFlowMapping(Type.SERVICE_1, sf -> sf.handle(service1::handleProcedure))
.subFlowMapping(Type.SERVICE_2, sf -> sf.handle(service2::handleProcedure)))
.get();
}
方法handleProcedure
的签名如下:
void handleProcedure(Message<?> message)
我希望在方法 handleProcedure
中得到 headers of Message
,但我现在得到异常。
您正在使用的 .handle
的 handle(GenericHandler<P>)
版本仅获取负载。
如果要接收完整的消息,需要使用不同的重载.handle
,例如handle("service1", "handleProcedure")
或.handle(service1, "handleProcedure")
。
我认为您没有正确理解堆栈跟踪。
您的 void handleProcedure(Message<?> message)
及其 service1::handleProcedure
方法引用完全符合 IntegrationFlowDefinition
.
中的 public B handle(MessageHandler messageHandler) {
方法签名
你的问题在这里:
.<Message<?>, String>route(HeadersUtil::getType,
您的 HeadersUtil::getType
需要一条消息,但 lambda 调用的类型是 payload
,在您的情况下是 String
。
这应该有效:
.<Message<?>, String>route(Message.class, HeadersUtil::getType,
我创建了一个流,它使用来自 RabbitMQ 的消息,然后使用路由器按类型分发到适当的服务。
服务中的方法采用参数 Message<?>
,因为我需要在那里使用 headers。但是在这种方法中,我只收到类型为 java.lang.String
而不是 org.springframework.messaging.Message
的消息有效负载,并且
我收到错误 java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
.
Payload 不适合我,因为我需要从消息中获取headers。
@Bean
public IntegrationFlow testFlow(String queueName,
ConnectionFactory connectionFactory,
Service1 service1,
Service2 service2) {
SimpleMessageListenerContainer consumerListener = new SimpleMessageListenerContainer(connectionFactory);
consumerListener.addQueueNames(queueName);
return IntegrationFlows.from(Amqp.inboundAdapter(consumerListener))
.transform(s -> s, ConsumerEndpointSpec::transactional)
.<Message<?>, String>route(HeadersUtil::getType, m -> m
.subFlowMapping(Type.SERVICE_1, sf -> sf.handle(service1::handleProcedure))
.subFlowMapping(Type.SERVICE_2, sf -> sf.handle(service2::handleProcedure)))
.get();
}
方法handleProcedure
的签名如下:
void handleProcedure(Message<?> message)
我希望在方法 handleProcedure
中得到 headers of Message
,但我现在得到异常。
您正在使用的 .handle
的 handle(GenericHandler<P>)
版本仅获取负载。
如果要接收完整的消息,需要使用不同的重载.handle
,例如handle("service1", "handleProcedure")
或.handle(service1, "handleProcedure")
。
我认为您没有正确理解堆栈跟踪。
您的 void handleProcedure(Message<?> message)
及其 service1::handleProcedure
方法引用完全符合 IntegrationFlowDefinition
.
public B handle(MessageHandler messageHandler) {
方法签名
你的问题在这里:
.<Message<?>, String>route(HeadersUtil::getType,
您的 HeadersUtil::getType
需要一条消息,但 lambda 调用的类型是 payload
,在您的情况下是 String
。
这应该有效:
.<Message<?>, String>route(Message.class, HeadersUtil::getType,