Spring 集成 DSL ScatterGather 流块

Spring Integration DSL ScatterGather flow blocks

我有一个集成流程,它执行分散收集操作,该操作命中多个 HTTP 端点 returning JSON。然后将结果聚合到单个 JSON 对象中。流程是这样的

@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
             .handle(HeaderPrinter::headerPrinter)
             .enrichHeaders(httpRequestHeaderEnricher())
             .scatterGather(
                scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .applySequence(true),
                gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
            )
            .handle(myService, "handleAggregatedJson")
            .handle(HeaderPrinter::headerPrinter)
            .handle(myOtherService, "handleMyServiceOutput")
            .channel("myFlow.output");
}

我正在使用如下声明的网关启动流程

@MessagingGateway
public interface IMyGateway {

    @Gateway(requestChannel = "myFlow.input", replyChannel = "myFlow.output")
    MyResult startFlow(@Payload String payload, @Header("header1") String header1, @Header("header2") String header2);

}

我遇到的问题是整个流程阻塞,网关超时。我在两个服务调用 IMyService::handleAggregatedJsonIMyOutherService::handleMyServiceResult 中设置了断点,它们都是 运行,但是输出永远不会到达网关的回复通道。如果我删除最后两个 handle 操作,那么流程 returns 通常是通过网关的结果。

我查看了流被阻塞时的堆栈跟踪,我可以看到线程 运行 流正在等待锁

java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Unsafe.java:-1) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:308) at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:300) at org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:201) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:234) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:95) at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:85) at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:487) at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:461) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:520) at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:469) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:460) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) at com.sun.proxy.$Proxy116.startFlow(Unknown Source:-1)

据我推测,如果流程花费的时间超过 X,那么它将阻塞。我尝试在流和网关之间放置一个集合点通道,但它似乎没有用。

关于导致超时问题的原因有什么想法吗?

附录:我一直在摆弄代码并删除了网关上的 return 类型,并且流程上的最后一个 .channel 调用似乎停止阻止它。

以下工作正常

@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
             .handle(HeaderPrinter::headerPrinter)
             .enrichHeaders(httpRequestHeaderEnricher())
             .scatterGather(
                scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .applySequence(true),
                gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
            )
            .handle(myService, "handleAggregatedJson")
            .handle(HeaderPrinter::headerPrinter)
            .handle(myOtherService, "handleMyServiceOutput")
            .handle(m -> {
                log.info("Flow completed successfully, payload as expected:" + payload);
            });
}

我想知道你的

.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")

收集 return 一些价值后。 request-reply 的典型错误,即流程中的某些步骤停止回复某个合理的值。

更新

您应该考虑从 @Gateway 定义中删除显式 replyChannel 声明,并从流程末尾删除 .channel("myFlow.output")。这样你应该得到对 replyChannel header 的回复。当您配置显式 replyChannel 时,无法保证您不会有其他订阅者到此频道来 "steal" 您的回复消息。

Reference Manual 中查看更多信息。