Spring 集成 DSL Scatter-Gather async/parallel 执行多个 recipientFlow

Spring integration DSL Scatter-Gather async/parallel execution for multiple recipientFlows

我们正在尝试使用分散-聚集对不同的收件人进行并行调用,并且效果很好。但是除非第一个收件人完成(在 Zipkin 中跟踪),否则第二个收件人流不会开始。有没有一种方法可以使所有收件人异步.. 非常类似于带有执行程序通道的拆分聚合。

public IntegrationFlow flow1() {

        return flow -> flow
                .split().channel(c -> c.executor(Executors.newCachedThreadPool()))
                .scatterGather(
                        scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(flow2())
                                .recipientFlow(flow3())
                                .recipientFlow(flow4())
                                .recipientFlow(flow5()),
                        gatherer -> gatherer
                                .outputProcessor(messageGroup -> {
                                    Object request = gatherResponse(messageGroup);
                                    return createResponse(request);
                                }))
                .aggregate();
    }

flow2(),flow3(),flow4() 方法是 InterationFlow 为 return 类型的方法。

示例代码 flow2() :

public IntegrationFlow flow2() {
        return integrationFlowDefinition -> integrationFlowDefinition
                .enrichHeaders(
                        h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
                .transform(ele -> createRequest1(ele))                  
                .wireTap("asyncXMLLogging")
                .handle(wsGateway.applyAsHandler(endpoint1))
                .transform(
                        ele -> response2(ele));
    }

上述 executor channel 确实可以做到这一点。您所有的收件人流程都必须真正从 ExecutorChannel 开始。在您的情况下,您必须将所有这些修改为如下内容:

public IntegrationFlow flow2() {
    return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele))
            .get();
}

关注IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))。这正是您可以使每个子流异步的方法。

更新

对于没有IntegrationFlow改进子流程的旧Spring集成版本,我们可以这样做:

public IntegrationFlow flow2() {
    return integrationFlowDefinition -> integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele));
}

这与您在上面的评论中显示的内容相似。