Spring 与 JMS 的集成无法正常工作
Spring Integration With JMS Not Working Properly
您好,我正在尝试构建一个 Spring 引导应用程序,其中包含 spring 集成
应用程序 1:发布者
Jms Message -> Broker ->queue1
应用程序 2:订阅者和发布者
Broker->queue1->Transform->HTTP CALL->HTTP Response->JMS Message->Broker->queue2
发布者流程
@Configuration
public class EchoFlowOutBound {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public IntegrationFlow toOutboundQueueFlow() {
return IntegrationFlows.from("requestChannel")
.handle(Jms.outboundGateway(connectionFactory)
.requestDestination("amq.outbound1")).get();
}
}
//Gateway
@MessagingGateway
public interface EchoGateway {
@Gateway(requestChannel = "requestChannel")
String echo(String message);
}
订阅者和发布者流程
@Configuration
public class MainOrchestrationFlow {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private QueueChannel jmsOutChannel;
@Bean
public IntegrationFlow orchestrationFlow() {
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("amq.outbound1")
.outputChannel(jmsOutChannel))
.<String, String>transform(s -> {
return s.toLowerCase();
})
// HTTP part goes here
.<String, HttpEntity>transform(HttpEntity::new)
.handle(
Http.outboundChannelAdapter("http://localhost:8080/uppercase")
.httpMethod(HttpMethod.POST)
.extractPayload(true)
.expectedResponseType(String.class)
)
// and here HTTP part ends
.handle(
Jms.outboundAdapter(connectionFactory)
.destination("amq.outbound2")
)
.get();
}
}
当我 运行 应用程序时,出现错误
Caused by: org.springframework.integration.MessageTimeoutException:
failed to receive JMS response within timeout of: 5000ms at
org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:762)
~[spring-integration-jms-5.0.6.RELEASE.jar:5.0.6.RELEASE] at
org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE] at
org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE] at
org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
谁能告诉我我做错了什么,
你的问题是你的消费者不是request-reply
。您从 amq.outbound1
收到消息并发送到 amq.outbound2
。就是这样:没有更多的事情发生。你有 one-way
流量。
同时你的制作人是request-reply
- handle(Jms.outboundGateway(connectionFactory)
。根据 JMS request-reply 方案的默认选项,出站网关确实希望在 ReplyTo
header 中得到回复。
因此,您必须自己决定:或者您需要将回复发送回制作人,或者您只需要来自该制作人的 send-and-forget。如果是,请参阅 Jms.outboundAdapter()
。
在 request-reply 的情况下,您不需要在消费者端使用 Jms.outboundAdapter()
:您必须使用 Jms.inboundGateway()
而不是 Jms.messageDrivenChannelAdapter()
。
您好,我正在尝试构建一个 Spring 引导应用程序,其中包含 spring 集成
应用程序 1:发布者
Jms Message -> Broker ->queue1
应用程序 2:订阅者和发布者
Broker->queue1->Transform->HTTP CALL->HTTP Response->JMS Message->Broker->queue2
发布者流程
@Configuration
public class EchoFlowOutBound {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public IntegrationFlow toOutboundQueueFlow() {
return IntegrationFlows.from("requestChannel")
.handle(Jms.outboundGateway(connectionFactory)
.requestDestination("amq.outbound1")).get();
}
}
//Gateway
@MessagingGateway
public interface EchoGateway {
@Gateway(requestChannel = "requestChannel")
String echo(String message);
}
订阅者和发布者流程
@Configuration
public class MainOrchestrationFlow {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private QueueChannel jmsOutChannel;
@Bean
public IntegrationFlow orchestrationFlow() {
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("amq.outbound1")
.outputChannel(jmsOutChannel))
.<String, String>transform(s -> {
return s.toLowerCase();
})
// HTTP part goes here
.<String, HttpEntity>transform(HttpEntity::new)
.handle(
Http.outboundChannelAdapter("http://localhost:8080/uppercase")
.httpMethod(HttpMethod.POST)
.extractPayload(true)
.expectedResponseType(String.class)
)
// and here HTTP part ends
.handle(
Jms.outboundAdapter(connectionFactory)
.destination("amq.outbound2")
)
.get();
}
}
当我 运行 应用程序时,出现错误
Caused by: org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:762) ~[spring-integration-jms-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
谁能告诉我我做错了什么,
你的问题是你的消费者不是request-reply
。您从 amq.outbound1
收到消息并发送到 amq.outbound2
。就是这样:没有更多的事情发生。你有 one-way
流量。
同时你的制作人是request-reply
- handle(Jms.outboundGateway(connectionFactory)
。根据 JMS request-reply 方案的默认选项,出站网关确实希望在 ReplyTo
header 中得到回复。
因此,您必须自己决定:或者您需要将回复发送回制作人,或者您只需要来自该制作人的 send-and-forget。如果是,请参阅 Jms.outboundAdapter()
。
在 request-reply 的情况下,您不需要在消费者端使用 Jms.outboundAdapter()
:您必须使用 Jms.inboundGateway()
而不是 Jms.messageDrivenChannelAdapter()
。