Spring 集成 Java DSL -- 聚合器的配置
Spring Integration Java DSL -- Configuration of aggregator
我有一个非常简单的集成流程,其中 RESTful 请求使用发布-订阅渠道转发给两个提供商。来自两个 RESTful 服务的结果随后被聚合到一个数组中。集成流程示意图如下:
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
).subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class)
)
)
)
.aggregate()
.get();
}
但是,当 运行 我的代码时,生成的数组仅包含 RESTful 服务之一返回的项目。是否缺少任何配置步骤?
更新
考虑到 Artem 的评论,以下版本对应于完整的解决方案。
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel-scatter")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather"))
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather")))
.get();
}
@Bean
IntegrationFlow gatherFlow() {
return IntegrationFlows.from("inputChannel-gather")
.aggregate(a -> a.outputProcessor(g -> new GenericMessage<ItemDTO[]>(
g.getMessages().stream()
.flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
.collect(Collectors.toList()).toArray(new ItemDTO[0]))))
.get();
}
其实不是这样的。
.aggregate()
是 publishSubscribeChannel
的 第三个 订阅者。
你必须将你的流量切断给其中两个。像这样:
@Bean
public IntegrationFlow publishSubscribeFlow() {
return flow -> flow
.publishSubscribeChannel(s -> s
.applySequence(true)
.subscribe(f -> f
.handle((p, h) -> "Hello")
.channel("publishSubscribeAggregateFlow.input"))
.subscribe(f -> f
.handle((p, h) -> "World!")
.channel("publishSubscribeAggregateFlow.input"))
);
}
@Bean
public IntegrationFlow publishSubscribeAggregateFlow() {
return flow -> flow
.aggregate(a -> a.outputProcessor(g -> g.getMessages()
.stream()
.<String>map(m -> (String) m.getPayload())
.collect(Collectors.joining(" "))))
.channel(c -> c.queue("subscriberAggregateResult"));
}
请注意两位订阅者的 .channel("publishSubscribeAggregateFlow.input")
使用情况。
老实说是一点publish-subscribe
。如果我们要聚合它们,我们必须知道将所有订阅者的结果发送到哪里。
你的用例让我想起了 Scatter-Gather EIP 模式。
我们还没有在 DSL 中实现它。
请随时就此事提出 GH issue,我们将尝试在即将推出的 1.2
版本中处理它。
更新
关于此事的GH问题:https://github.com/spring-projects/spring-integration-java-dsl/issues/75
我有一个非常简单的集成流程,其中 RESTful 请求使用发布-订阅渠道转发给两个提供商。来自两个 RESTful 服务的结果随后被聚合到一个数组中。集成流程示意图如下:
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
).subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class)
)
)
)
.aggregate()
.get();
}
但是,当 运行 我的代码时,生成的数组仅包含 RESTful 服务之一返回的项目。是否缺少任何配置步骤?
更新
考虑到 Artem 的评论,以下版本对应于完整的解决方案。
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel-scatter")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather"))
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather")))
.get();
}
@Bean
IntegrationFlow gatherFlow() {
return IntegrationFlows.from("inputChannel-gather")
.aggregate(a -> a.outputProcessor(g -> new GenericMessage<ItemDTO[]>(
g.getMessages().stream()
.flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
.collect(Collectors.toList()).toArray(new ItemDTO[0]))))
.get();
}
其实不是这样的。
.aggregate()
是 publishSubscribeChannel
的 第三个 订阅者。
你必须将你的流量切断给其中两个。像这样:
@Bean
public IntegrationFlow publishSubscribeFlow() {
return flow -> flow
.publishSubscribeChannel(s -> s
.applySequence(true)
.subscribe(f -> f
.handle((p, h) -> "Hello")
.channel("publishSubscribeAggregateFlow.input"))
.subscribe(f -> f
.handle((p, h) -> "World!")
.channel("publishSubscribeAggregateFlow.input"))
);
}
@Bean
public IntegrationFlow publishSubscribeAggregateFlow() {
return flow -> flow
.aggregate(a -> a.outputProcessor(g -> g.getMessages()
.stream()
.<String>map(m -> (String) m.getPayload())
.collect(Collectors.joining(" "))))
.channel(c -> c.queue("subscriberAggregateResult"));
}
请注意两位订阅者的 .channel("publishSubscribeAggregateFlow.input")
使用情况。
老实说是一点publish-subscribe
。如果我们要聚合它们,我们必须知道将所有订阅者的结果发送到哪里。
你的用例让我想起了 Scatter-Gather EIP 模式。
我们还没有在 DSL 中实现它。
请随时就此事提出 GH issue,我们将尝试在即将推出的 1.2
版本中处理它。
更新
关于此事的GH问题:https://github.com/spring-projects/spring-integration-java-dsl/issues/75