Spring 集成 DSL Scatter-Gather async/parallel 执行多个 recipientFlow
Spring integration DSL Scatter-Gather async/parallel execution for multiple recipientFlows
我们正在尝试使用分散-聚集对不同的收件人进行并行调用,并且效果很好。但是除非第一个收件人完成(在 Zipkin 中跟踪),否则第二个收件人流不会开始。有没有一种方法可以使所有收件人异步.. 非常类似于带有执行程序通道的拆分聚合。
public IntegrationFlow flow1() {
return flow -> flow
.split().channel(c -> c.executor(Executors.newCachedThreadPool()))
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(flow2())
.recipientFlow(flow3())
.recipientFlow(flow4())
.recipientFlow(flow5()),
gatherer -> gatherer
.outputProcessor(messageGroup -> {
Object request = gatherResponse(messageGroup);
return createResponse(request);
}))
.aggregate();
}
flow2(),flow3(),flow4() 方法是 InterationFlow
为 return 类型的方法。
示例代码 flow2()
:
public IntegrationFlow flow2() {
return integrationFlowDefinition -> integrationFlowDefinition
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele));
}
上述 executor channel
确实可以做到这一点。您所有的收件人流程都必须真正从 ExecutorChannel
开始。在您的情况下,您必须将所有这些修改为如下内容:
public IntegrationFlow flow2() {
return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele))
.get();
}
关注IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
。这正是您可以使每个子流异步的方法。
更新
对于没有IntegrationFlow
改进子流程的旧Spring集成版本,我们可以这样做:
public IntegrationFlow flow2() {
return integrationFlowDefinition -> integrationFlowDefinition
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele));
}
这与您在上面的评论中显示的内容相似。
我们正在尝试使用分散-聚集对不同的收件人进行并行调用,并且效果很好。但是除非第一个收件人完成(在 Zipkin 中跟踪),否则第二个收件人流不会开始。有没有一种方法可以使所有收件人异步.. 非常类似于带有执行程序通道的拆分聚合。
public IntegrationFlow flow1() {
return flow -> flow
.split().channel(c -> c.executor(Executors.newCachedThreadPool()))
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(flow2())
.recipientFlow(flow3())
.recipientFlow(flow4())
.recipientFlow(flow5()),
gatherer -> gatherer
.outputProcessor(messageGroup -> {
Object request = gatherResponse(messageGroup);
return createResponse(request);
}))
.aggregate();
}
flow2(),flow3(),flow4() 方法是 InterationFlow
为 return 类型的方法。
示例代码 flow2()
:
public IntegrationFlow flow2() {
return integrationFlowDefinition -> integrationFlowDefinition
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele));
}
上述 executor channel
确实可以做到这一点。您所有的收件人流程都必须真正从 ExecutorChannel
开始。在您的情况下,您必须将所有这些修改为如下内容:
public IntegrationFlow flow2() {
return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele))
.get();
}
关注IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
。这正是您可以使每个子流异步的方法。
更新
对于没有IntegrationFlow
改进子流程的旧Spring集成版本,我们可以这样做:
public IntegrationFlow flow2() {
return integrationFlowDefinition -> integrationFlowDefinition
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.enrichHeaders(
h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
.transform(ele -> createRequest1(ele))
.wireTap("asyncXMLLogging")
.handle(wsGateway.applyAsHandler(endpoint1))
.transform(
ele -> response2(ele));
}
这与您在上面的评论中显示的内容相似。