Spring 集成 - "iterates" 拆分器拆分的消息到底是谁?

Spring Integration - Who exactly "iterates" the splitted messages from a splitter?

关于 SI 中的 splitter 元素,我有几个基本问​​题。

我了解到,要形成自定义的拆分器逻辑,我们需要扩展 AbstractMessageSplitter 并重写 splitMessage 方法。

集合 的拆分消息随后会显示在 splitteroutput-channel 上(假设传入消息没有配置回复通道)。

splitteroutput-channel 可以成为另一个 si 组件的 input-channel。假设组件是一个简单的 service-activator.

手册说集合的“each”消息将由该集合所呈现的下游配置处理。因此在我们的示例中,service-activator 将处理“each”消息。

1) 我的查询是“who”,正好迭代 集合并呈现“each" 来自服务激活器输入通道集合的消息?

2) 假设这里涉及所有直接渠道。现在,如果 service-activator 的 output-channel 配置为 nullChannel,它是否仍会在第一个集合中显示“next”消息消息已成功处理?

3) 假设这里没有 aggregator。我们还假设所有涉及的方法都具有非空 return。 将什么 returned 到调用者线程,将“collection”消息呈现给拆分器的 output-channel ?它会取回服务激活器 return 的 return 类型的集合吗?还是仅取回集合中最后处理的消息的 return?

我希望我的查询足够详尽,以便弄清楚它们。

非常感谢回复。

最好的问候

是的。你问的问题很好!

1) AbstractMessageProducingHandler:

protected void sendOutputs(Object result, Message<?> requestMessage) {
    if (result instanceof Iterable<?> && shouldSplitOutput((Iterable<?>) result)) {
        for (Object o : (Iterable<?>) result) {
            this.produceOutput(o, requestMessage);
        }
    }
    else if (result != null) {
        this.produceOutput(result, requestMessage);
    }
}

正是从这里将每个项目发送到 output-channel

2) 是的,如果所有内容都在DirectChannel 上,下一条消息将在上一条消息完成后才会发送。实际上它的工作方式与原始 java:

相同
 for (Object o : collection) {

 }

注意:在sync这种情况下,如果当前消息处理失败,将不会发送下一条消息。因为 Exception。与原始 Java 完全一样 :-).

3) 嗯。使用原始 Java 它可能看起来像:

split(Collection<?>, Future<?>);  
 --->>
 for (Object o : collection) {
     process(Object, Future<?>)
 }

正是这个过程在 splitter 中完成,甚至在每个 EIP 组件中完成得更好。实际上它们都只能在 push 模式下工作。即使它对回复做了一些事情,它实际上也只是推送到 replyChannel.

因此,如果您不关心回复过程,只有第一条消息会填充对 Future<?> 的回复 - 在我们的例子中发送回复 replyChannel。是的,所有其他人都可以这样做,但回复将被忽略。请参阅 TemporaryReplyChannel 源代码。

请注意,如果您的下游分离器工艺是 async,例如ExecutorChannel 作为 <splitter> 上的 output-channel,无法预测回复中哪条消息有罪。因此,为了让您对发送 Thread 感到满意,您应该围绕这些拆分的消息找出一些足够的算法。甚至 <aggregator>

我认为你最后一个问题的答案取决于目标业务逻辑需求。