有没有办法使用 DSL 来分叉 Spring IntegrationFlow?
Is there a way to fork the Spring IntegrationFlow using DSL?
我想做这样的事情,其中网关负载是一个字符串,serviceA 和 serviceB 都是 return 列表。
final IntegrationFlow flowA = flow -> flow
.handle(serviceA)
.handle((payload, headers) -> payload); // List<Object>
final IntegrationFlow flowB = flow -> flow
.handle(serviceB)
.handle((payload, headers) -> payload); // List<Object>
return IntegrationFlows
.from(myGateway) // String payload
.forkAndMerge(flowA, flowB, executor)
.handle((payload, headers) -> payload)
.get();
是否可以将流程分成两部分,然后收集结果?拆分器和聚合器的大多数示例都涉及拆分列表。
查看 .scatterGather()
个变体。
Main docs for the ScatterGatherer here.
编辑
示例:
@SpringBootApplication
public class So63605348Application {
private static final Logger log = LoggerFactory.getLogger(So63605348Application.class);
public static void main(String[] args) {
SpringApplication.run(So63605348Application.class, args);
}
@Bean
IntegrationFlow flow(TaskExecutor exec) {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
.scatterGather(s -> s.applySequence(true)
.recipientFlow(subFlow -> subFlow.channel(c -> c.executor(exec))
.<String>handle((p, h) -> {
log.info(p.toString());
return p.toUpperCase();
}))
.recipientFlow(subFlow -> subFlow.channel(c -> c.executor(exec))
.<String>handle((p, h) -> {
log.info(p.toString());
return p + p;
})))
.handle(System.out::println)
.get();
}
@Bean
public TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(2);
return exec;
}
}
结果
2020-08-26 17:33:56.769 INFO 50829 --- [ exec-1] com.example.demo.So63605348Application : foo
2020-08-26 17:33:56.769 INFO 50829 --- [ exec-2] com.example.demo.So63605348Application : foo
GenericMessage [payload=[foofoo, FOO], headers=...
EDIT2
如果您不想嵌套子流程,您可以将它们分解出来...
@Bean
IntegrationFlow flow(TaskExecutor exec) {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
.scatterGather(s -> s.applySequence(true)
.recipientFlow(flow2())
.recipientFlow(flow3()))
.handle(System.out::println)
.get();
}
@Bean
IntegrationFlow flow2() {
return f -> f
.<String>handle((p, h) -> {
log.info(p.toString());
return p + p;
});
}
@Bean
IntegrationFlow flow3() {
return f -> f
.<String>handle((p, h) -> {
log.info(p.toString());
return p.toUpperCase();
});
}
或者您可以使用 pub/sub 渠道变体...
@Bean
IntegrationFlow flow() {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
.scatterGather(pubSub())
.handle(System.out::println)
.get();
}
@Bean
PublishSubscribeChannel pubSub() {
PublishSubscribeChannel pubSub = new PublishSubscribeChannel(exec());
pubSub.setApplySequence(true);
return pubSub;
}
@Bean
IntegrationFlow flow2() {
return IntegrationFlows.from("pubSub")
.<String>handle((p, h) -> {
log.info(p.toString());
return p + p;
})
.get();
}
@Bean
IntegrationFlow flow3() {
return IntegrationFlows.from("pubSub")
.<String>handle((p, h) -> {
log.info(p.toString());
return p.toUpperCase();
})
.get();
}
我想做这样的事情,其中网关负载是一个字符串,serviceA 和 serviceB 都是 return 列表。
final IntegrationFlow flowA = flow -> flow
.handle(serviceA)
.handle((payload, headers) -> payload); // List<Object>
final IntegrationFlow flowB = flow -> flow
.handle(serviceB)
.handle((payload, headers) -> payload); // List<Object>
return IntegrationFlows
.from(myGateway) // String payload
.forkAndMerge(flowA, flowB, executor)
.handle((payload, headers) -> payload)
.get();
是否可以将流程分成两部分,然后收集结果?拆分器和聚合器的大多数示例都涉及拆分列表。
查看 .scatterGather()
个变体。
Main docs for the ScatterGatherer here.
编辑
示例:
@SpringBootApplication
public class So63605348Application {
private static final Logger log = LoggerFactory.getLogger(So63605348Application.class);
public static void main(String[] args) {
SpringApplication.run(So63605348Application.class, args);
}
@Bean
IntegrationFlow flow(TaskExecutor exec) {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
.scatterGather(s -> s.applySequence(true)
.recipientFlow(subFlow -> subFlow.channel(c -> c.executor(exec))
.<String>handle((p, h) -> {
log.info(p.toString());
return p.toUpperCase();
}))
.recipientFlow(subFlow -> subFlow.channel(c -> c.executor(exec))
.<String>handle((p, h) -> {
log.info(p.toString());
return p + p;
})))
.handle(System.out::println)
.get();
}
@Bean
public TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(2);
return exec;
}
}
结果
2020-08-26 17:33:56.769 INFO 50829 --- [ exec-1] com.example.demo.So63605348Application : foo
2020-08-26 17:33:56.769 INFO 50829 --- [ exec-2] com.example.demo.So63605348Application : foo
GenericMessage [payload=[foofoo, FOO], headers=...
EDIT2
如果您不想嵌套子流程,您可以将它们分解出来...
@Bean
IntegrationFlow flow(TaskExecutor exec) {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
.scatterGather(s -> s.applySequence(true)
.recipientFlow(flow2())
.recipientFlow(flow3()))
.handle(System.out::println)
.get();
}
@Bean
IntegrationFlow flow2() {
return f -> f
.<String>handle((p, h) -> {
log.info(p.toString());
return p + p;
});
}
@Bean
IntegrationFlow flow3() {
return f -> f
.<String>handle((p, h) -> {
log.info(p.toString());
return p.toUpperCase();
});
}
或者您可以使用 pub/sub 渠道变体...
@Bean
IntegrationFlow flow() {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
.scatterGather(pubSub())
.handle(System.out::println)
.get();
}
@Bean
PublishSubscribeChannel pubSub() {
PublishSubscribeChannel pubSub = new PublishSubscribeChannel(exec());
pubSub.setApplySequence(true);
return pubSub;
}
@Bean
IntegrationFlow flow2() {
return IntegrationFlows.from("pubSub")
.<String>handle((p, h) -> {
log.info(p.toString());
return p + p;
})
.get();
}
@Bean
IntegrationFlow flow3() {
return IntegrationFlows.from("pubSub")
.<String>handle((p, h) -> {
log.info(p.toString());
return p.toUpperCase();
})
.get();
}