Spring 如果成功传送到队列,则集成 DSL 回复成功

Spring Integration DSL reply success if successful delivery to queue

我正在尝试公开 HTTP 端点,该端点会将消息丢弃到 JMS 队列我想在传递成功时回复成功,在无法传递消息时回复失败。

 @Bean
    public IntegrationFlow systemTaskCall(MapToServiceTaskConfigTransformer mapTransformer, CachingConnectionFactory jmsConnectionFactory) {
        return IntegrationFlows.from(
                Http.inboundGateway("/spartaSystemTask")
                        .requestMapping(r -> r
                                .methods(HttpMethod.POST)
                                .consumes("application/json")
                        )
                        .requestPayloadType(Map.class)
                        .replyChannel(RESPONSE_CHANNEL)
                        .errorChannel("errorChannel")
        )
                .handle((payload, headers) -> mapTransformer.transform((Map<String, String>) payload))
                .enrichHeaders(Collections.singletonMap(DESTINATION_QUEUE, "request.queue"))
                .enrichHeaders(Collections.singletonMap(JMS_REPLY_TO, "response.queue"))
                .transform(Transformers.toJson())
                .handle(
                        Jms.outboundGateway(jmsConnectionFactory,)
                            .requestDestination(message -> message.getHeaders().get(DESTINATION_QUEUE))

                )
                .log(LoggingHandler.Level.ERROR)
                .enrichHeaders(
                        c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED)
                )
                .transform(source -> "SUCCESS")
                .transform(Transformers.toJson())
                .channel(RESPONSE_CHANNEL)
                .get();
    }

 @Bean
    public IntegrationFlow errorFlow(){
        return IntegrationFlows.from("errorChannel")
                .transform(source -> "error")
                .transform(Transformers.toJson())
                .channel(RESPONSE_CHANNEL)
                .get();
    }

当我调用此 URL 时,消息被丢弃但 HTTP 调用超时。似乎 post JMS Outbound 网关调用的其余代码未执行。

如果消息传递失败,我会收到正确的回复。

你有这样的配置:

.handle(
                    Jms.outboundGateway(jmsConnectionFactory,)
                        .requestDestination(message -> message.getHeaders().get(DESTINATION_QUEUE))

            )
  • 一个outboundGateway。这意味着您发送了一个请求并期望来自另一方的响应,但看起来您只将一条 JMS 消息发送到队列中,而另一方的侦听器端没有人回答您 response.queue。这就是正常 JMS 发布超时的原因。

您需要确保您的流程中的逻辑是正确的,并且它在您的分布式解决方案中确实有效,您希望从服务器端获得一些回复。

否则,您需要考虑将您的逻辑更改为 Jms.outboundAdapter(),这实际上是单向发件人。对于 HTTP 回复,您可以使用带有此 Jms.outboundAdapter()publishSubscribeChannel() 作为第一个订阅者,而您的流程的其余部分作为第二个订阅者。这样,在第一个订阅者正确完成其逻辑之前,不会调用第二个订阅者。对于错误情况,您可以用 ExpressionEvaluatingRequestHandlerAdvice 包裹 Jms.outboundAdapter()https://docs.spring.io/spring-integration/docs/5.2.3.RELEASE/reference/html/messaging-endpoints.html#message-handler-advice-chain