Spring 如果成功传送到队列,则集成 DSL 回复成功
Spring Integration DSL reply success if successful delivery to queue
我正在尝试公开 HTTP 端点,该端点会将消息丢弃到 JMS 队列我想在传递成功时回复成功,在无法传递消息时回复失败。
@Bean
public IntegrationFlow systemTaskCall(MapToServiceTaskConfigTransformer mapTransformer, CachingConnectionFactory jmsConnectionFactory) {
return IntegrationFlows.from(
Http.inboundGateway("/spartaSystemTask")
.requestMapping(r -> r
.methods(HttpMethod.POST)
.consumes("application/json")
)
.requestPayloadType(Map.class)
.replyChannel(RESPONSE_CHANNEL)
.errorChannel("errorChannel")
)
.handle((payload, headers) -> mapTransformer.transform((Map<String, String>) payload))
.enrichHeaders(Collections.singletonMap(DESTINATION_QUEUE, "request.queue"))
.enrichHeaders(Collections.singletonMap(JMS_REPLY_TO, "response.queue"))
.transform(Transformers.toJson())
.handle(
Jms.outboundGateway(jmsConnectionFactory,)
.requestDestination(message -> message.getHeaders().get(DESTINATION_QUEUE))
)
.log(LoggingHandler.Level.ERROR)
.enrichHeaders(
c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED)
)
.transform(source -> "SUCCESS")
.transform(Transformers.toJson())
.channel(RESPONSE_CHANNEL)
.get();
}
@Bean
public IntegrationFlow errorFlow(){
return IntegrationFlows.from("errorChannel")
.transform(source -> "error")
.transform(Transformers.toJson())
.channel(RESPONSE_CHANNEL)
.get();
}
当我调用此 URL 时,消息被丢弃但 HTTP 调用超时。似乎 post JMS Outbound 网关调用的其余代码未执行。
如果消息传递失败,我会收到正确的回复。
你有这样的配置:
.handle(
Jms.outboundGateway(jmsConnectionFactory,)
.requestDestination(message -> message.getHeaders().get(DESTINATION_QUEUE))
)
- 一个
outboundGateway
。这意味着您发送了一个请求并期望来自另一方的响应,但看起来您只将一条 JMS 消息发送到队列中,而另一方的侦听器端没有人回答您 response.queue
。这就是正常 JMS 发布超时的原因。
您需要确保您的流程中的逻辑是正确的,并且它在您的分布式解决方案中确实有效,您希望从服务器端获得一些回复。
否则,您需要考虑将您的逻辑更改为 Jms.outboundAdapter()
,这实际上是单向发件人。对于 HTTP 回复,您可以使用带有此 Jms.outboundAdapter()
的 publishSubscribeChannel()
作为第一个订阅者,而您的流程的其余部分作为第二个订阅者。这样,在第一个订阅者正确完成其逻辑之前,不会调用第二个订阅者。对于错误情况,您可以用 ExpressionEvaluatingRequestHandlerAdvice
包裹 Jms.outboundAdapter()
:https://docs.spring.io/spring-integration/docs/5.2.3.RELEASE/reference/html/messaging-endpoints.html#message-handler-advice-chain
我正在尝试公开 HTTP 端点,该端点会将消息丢弃到 JMS 队列我想在传递成功时回复成功,在无法传递消息时回复失败。
@Bean
public IntegrationFlow systemTaskCall(MapToServiceTaskConfigTransformer mapTransformer, CachingConnectionFactory jmsConnectionFactory) {
return IntegrationFlows.from(
Http.inboundGateway("/spartaSystemTask")
.requestMapping(r -> r
.methods(HttpMethod.POST)
.consumes("application/json")
)
.requestPayloadType(Map.class)
.replyChannel(RESPONSE_CHANNEL)
.errorChannel("errorChannel")
)
.handle((payload, headers) -> mapTransformer.transform((Map<String, String>) payload))
.enrichHeaders(Collections.singletonMap(DESTINATION_QUEUE, "request.queue"))
.enrichHeaders(Collections.singletonMap(JMS_REPLY_TO, "response.queue"))
.transform(Transformers.toJson())
.handle(
Jms.outboundGateway(jmsConnectionFactory,)
.requestDestination(message -> message.getHeaders().get(DESTINATION_QUEUE))
)
.log(LoggingHandler.Level.ERROR)
.enrichHeaders(
c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED)
)
.transform(source -> "SUCCESS")
.transform(Transformers.toJson())
.channel(RESPONSE_CHANNEL)
.get();
}
@Bean
public IntegrationFlow errorFlow(){
return IntegrationFlows.from("errorChannel")
.transform(source -> "error")
.transform(Transformers.toJson())
.channel(RESPONSE_CHANNEL)
.get();
}
当我调用此 URL 时,消息被丢弃但 HTTP 调用超时。似乎 post JMS Outbound 网关调用的其余代码未执行。
如果消息传递失败,我会收到正确的回复。
你有这样的配置:
.handle(
Jms.outboundGateway(jmsConnectionFactory,)
.requestDestination(message -> message.getHeaders().get(DESTINATION_QUEUE))
)
- 一个
outboundGateway
。这意味着您发送了一个请求并期望来自另一方的响应,但看起来您只将一条 JMS 消息发送到队列中,而另一方的侦听器端没有人回答您response.queue
。这就是正常 JMS 发布超时的原因。
您需要确保您的流程中的逻辑是正确的,并且它在您的分布式解决方案中确实有效,您希望从服务器端获得一些回复。
否则,您需要考虑将您的逻辑更改为 Jms.outboundAdapter()
,这实际上是单向发件人。对于 HTTP 回复,您可以使用带有此 Jms.outboundAdapter()
的 publishSubscribeChannel()
作为第一个订阅者,而您的流程的其余部分作为第二个订阅者。这样,在第一个订阅者正确完成其逻辑之前,不会调用第二个订阅者。对于错误情况,您可以用 ExpressionEvaluatingRequestHandlerAdvice
包裹 Jms.outboundAdapter()
:https://docs.spring.io/spring-integration/docs/5.2.3.RELEASE/reference/html/messaging-endpoints.html#message-handler-advice-chain