Spring 集成聚合器与发布订阅者相结合

Spring Integration Aggregator combined with Publish Subscriber

我有关于 Spring 集成的基本问题。我尝试在其他线程中搜索,但对答案不满意。所以这是我的问题。任何提示将不胜感激。

我正在尝试从 4 个不同的来源并行获取数据并聚合数据。

我的理解(根据我之前阅读的线程之一)是,如果分离器的任何输出通道是直接通道,则请求按顺序路由。

但是Splitter的输出通道是receient-list路由器。当我调试时,请求是按顺序路由的。所以经过一些分析,我发现我们可以使用有效载荷类型的路由器来并行处理。

所以我将路由器更改为有效载荷类型的路由器,并且仍然按顺序路由请求。

所以我改变了我的策略,使用了一个发布-订阅者通道,并将所有订阅通道的输出连接到一个聚合器,这样我就可以聚合结果。我能够并行获取数据,但我没有看到响应被路由到聚合。我做错了吗?求推荐!

    <int:publish-subscribe-channel id="PublishSubscriberChannel"
    task-executor="taskExecutor" />

<task:executor id="taskExecutor" pool-size="5" />

<int:service-activator input-channel="PublishSubscriberChannel"
    ref="lookUpService" method="lookupSource1" output-channel="lookupAggregatorChannel" />

<int:service-activator input-channel="PublishSubscriberChannel"
    ref="lookUpService" method="lookupSource2" output-channel="lookupAggregatorChannel" />

<int:service-activator input-channel="PublishSubscriberChannel"
    ref="lookUpService" method="getVehicleInfoWithAdditionalAttributes"
    output-channel="lookupAggregatorChannel" />

<int:service-activator input-channel="PublishSubscriberChannel"
    ref="lookUpService" method="lookupSource4" output-channel="lookupAggregatorChannel" />

所以最后,我发现我做错了什么。 最初,我正在研究 Spring 集成版本 2.0。这是一个旧的应用程序。因此,当我的代码正是问题中给出的代码时,我看不到请求被路由到聚合器。 然后,当我将 Spring 版本升级到 4.3.2 RELEASE 时,我开始收到相关策略可能为空的错误。那是当我发现聚合器需要一个相关 ID 并且如果我有自动添加分离器相关 ID 并且对于发布-订阅通道,我们需要添加 apply-sequence="true" 以添加默认相关 ID可以被聚合器使用。所以从本质上讲,这就是我的代码现在的样子并且可以完美地工作。

<int:publish-subscribe-channel id="PublishSubscriberChannel" apply-sequence="true"
    task-executor="taskExecutor" />

<task:executor id="taskExecutor" pool-size="5" />

<int:service-activator input-channel="PublishSubscriberChannel"
    ref="lookUpService" method="lookupSource1" output-channel="lookupAggregatorChannel" />

<int:service-activator input-channel="PublishSubscriberChannel"
    ref="lookUpService" method="lookupSource2" output-channel="lookupAggregatorChannel" />

<int:service-activator input-channel="PublishSubscriberChannel"
    ref="lookUpService" method="lookupSource3"
    output-channel="lookupAggregatorChannel" />

<int:service-activator input-channel="PublishSubscriberChannel"
    ref="lookUpService" method="lookupSource4" output-channel="lookupAggregatorChannel" />