Spring 集成 - "iterates" 拆分器拆分的消息到底是谁?
Spring Integration - Who exactly "iterates" the splitted messages from a splitter?
关于 SI 中的 splitter
元素,我有几个基本问题。
我了解到,要形成自定义的拆分器逻辑,我们需要扩展 AbstractMessageSplitter
并重写 splitMessage
方法。
此 集合 的拆分消息随后会显示在 splitter
的 output-channel
上(假设传入消息没有配置回复通道)。
splitter
的 output-channel
可以成为另一个 si 组件的 input-channel
。假设组件是一个简单的 service-activator
.
手册说集合的“each”消息将由该集合所呈现的下游配置处理。因此在我们的示例中,service-activator
将处理“each”消息。
1) 我的查询是“who”,正好迭代 集合并呈现“each" 来自服务激活器输入通道集合的消息?
2) 假设这里涉及所有直接渠道。现在,如果 service-activato
r 的 output-channel
配置为 nullChannel
,它是否仍会在第一个集合中显示“next”消息消息已成功处理?
3) 假设这里没有 aggregator
。我们还假设所有涉及的方法都具有非空 return。
将什么 returned 到调用者线程,将“collection”消息呈现给拆分器的 output-channel
?它会取回服务激活器 return 的 return 类型的集合吗?还是仅取回集合中最后处理的消息的 return?
我希望我的查询足够详尽,以便弄清楚它们。
非常感谢回复。
最好的问候
是的。你问的问题很好!
1) AbstractMessageProducingHandler
:
protected void sendOutputs(Object result, Message<?> requestMessage) {
if (result instanceof Iterable<?> && shouldSplitOutput((Iterable<?>) result)) {
for (Object o : (Iterable<?>) result) {
this.produceOutput(o, requestMessage);
}
}
else if (result != null) {
this.produceOutput(result, requestMessage);
}
}
正是从这里将每个项目发送到 output-channel
。
2) 是的,如果所有内容都在DirectChannel
上,下一条消息将在上一条消息完成后才会发送。实际上它的工作方式与原始 java:
相同
for (Object o : collection) {
}
注意:在sync
这种情况下,如果当前消息处理失败,将不会发送下一条消息。因为 Exception
。与原始 Java 完全一样 :-).
3) 嗯。使用原始 Java 它可能看起来像:
split(Collection<?>, Future<?>);
--->>
for (Object o : collection) {
process(Object, Future<?>)
}
正是这个过程在 splitter
中完成,甚至在每个 EIP 组件中完成得更好。实际上它们都只能在 push 模式下工作。即使它对回复做了一些事情,它实际上也只是推送到 replyChannel
.
因此,如果您不关心回复过程,只有第一条消息会填充对 Future<?>
的回复 - 在我们的例子中发送回复 replyChannel
。是的,所有其他人都可以这样做,但回复将被忽略。请参阅 TemporaryReplyChannel
源代码。
请注意,如果您的下游分离器工艺是 async
,例如ExecutorChannel
作为 <splitter>
上的 output-channel
,无法预测回复中哪条消息有罪。因此,为了让您对发送 Thread 感到满意,您应该围绕这些拆分的消息找出一些足够的算法。甚至 <aggregator>
。
我认为你最后一个问题的答案取决于目标业务逻辑需求。
关于 SI 中的 splitter
元素,我有几个基本问题。
我了解到,要形成自定义的拆分器逻辑,我们需要扩展 AbstractMessageSplitter
并重写 splitMessage
方法。
此 集合 的拆分消息随后会显示在 splitter
的 output-channel
上(假设传入消息没有配置回复通道)。
splitter
的 output-channel
可以成为另一个 si 组件的 input-channel
。假设组件是一个简单的 service-activator
.
手册说集合的“each”消息将由该集合所呈现的下游配置处理。因此在我们的示例中,service-activator
将处理“each”消息。
1) 我的查询是“who”,正好迭代 集合并呈现“each" 来自服务激活器输入通道集合的消息?
2) 假设这里涉及所有直接渠道。现在,如果 service-activato
r 的 output-channel
配置为 nullChannel
,它是否仍会在第一个集合中显示“next”消息消息已成功处理?
3) 假设这里没有 aggregator
。我们还假设所有涉及的方法都具有非空 return。
将什么 returned 到调用者线程,将“collection”消息呈现给拆分器的 output-channel
?它会取回服务激活器 return 的 return 类型的集合吗?还是仅取回集合中最后处理的消息的 return?
我希望我的查询足够详尽,以便弄清楚它们。
非常感谢回复。
最好的问候
是的。你问的问题很好!
1) AbstractMessageProducingHandler
:
protected void sendOutputs(Object result, Message<?> requestMessage) {
if (result instanceof Iterable<?> && shouldSplitOutput((Iterable<?>) result)) {
for (Object o : (Iterable<?>) result) {
this.produceOutput(o, requestMessage);
}
}
else if (result != null) {
this.produceOutput(result, requestMessage);
}
}
正是从这里将每个项目发送到 output-channel
。
2) 是的,如果所有内容都在DirectChannel
上,下一条消息将在上一条消息完成后才会发送。实际上它的工作方式与原始 java:
for (Object o : collection) {
}
注意:在sync
这种情况下,如果当前消息处理失败,将不会发送下一条消息。因为 Exception
。与原始 Java 完全一样 :-).
3) 嗯。使用原始 Java 它可能看起来像:
split(Collection<?>, Future<?>);
--->>
for (Object o : collection) {
process(Object, Future<?>)
}
正是这个过程在 splitter
中完成,甚至在每个 EIP 组件中完成得更好。实际上它们都只能在 push 模式下工作。即使它对回复做了一些事情,它实际上也只是推送到 replyChannel
.
因此,如果您不关心回复过程,只有第一条消息会填充对 Future<?>
的回复 - 在我们的例子中发送回复 replyChannel
。是的,所有其他人都可以这样做,但回复将被忽略。请参阅 TemporaryReplyChannel
源代码。
请注意,如果您的下游分离器工艺是 async
,例如ExecutorChannel
作为 <splitter>
上的 output-channel
,无法预测回复中哪条消息有罪。因此,为了让您对发送 Thread 感到满意,您应该围绕这些拆分的消息找出一些足够的算法。甚至 <aggregator>
。
我认为你最后一个问题的答案取决于目标业务逻辑需求。