Spring Integration 5.0 + Project Reactor:控制线程
Spring Integration 5.0 + Project Reactor: controlling threads
的跟进问题
在使用 Flux
+ split()
+ FluxMessageChannel
.
时,我无法在并行线程中将集成处理程序设为 运行
考虑以下片段:
// ...
.handle(message -> Flux.range(0, 10)
.doOnNext(i -> LOG.info("> " + i))
.subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...
所有日志在一个线程中输出:
[ parallel-1] d.a.Application : > 0
[ parallel-1] d.a.Application : -> 0
[ parallel-1] d.a.Application : > 1
[ parallel-1] d.a.Application : -> 1
[ parallel-1] d.a.Application : > 2
[ parallel-1] d.a.Application : -> 2
[ parallel-1] d.a.Application : > 3
[ parallel-1] d.a.Application : -> 3
[ parallel-1] d.a.Application : > 4
[ parallel-1] d.a.Application : -> 4
[ parallel-1] d.a.Application : > 5
[ parallel-1] d.a.Application : -> 5
[ parallel-1] d.a.Application : > 6
[ parallel-1] d.a.Application : -> 6
[ parallel-1] d.a.Application : > 7
[ parallel-1] d.a.Application : -> 7
[ parallel-1] d.a.Application : > 8
[ parallel-1] d.a.Application : -> 8
[ parallel-1] d.a.Application : > 9
[ parallel-1] d.a.Application : -> 9
如何强制在多线程中处理?
我试过在 Flux
上使用 .parallel().runOn()
,但这只会使获取数据并行化,但实际处理仍在一个线程上 运行s。
我也在 Flux
上试过 .publishOn(Schedulers.parallel())
但没有效果。
并且将 ExecutorChannel
或带有执行程序的 Poller
添加到处理程序也没有帮助。
这有一些技巧:
.channel(new FluxMessageChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(message -> LOG.info(" -> " + message.getPayload())))
FluxMessageChannel
使用的那些消息将与其他 ExecutorChannel
并行使用。
我认为您要问的是使上述 FluxMessageChannel
可配置的功能请求。这样的subscribeOn/publishOn
等可以在那里配置。
欢迎就此事提出 JIRA!
在使用 Flux
+ split()
+ FluxMessageChannel
.
考虑以下片段:
// ...
.handle(message -> Flux.range(0, 10)
.doOnNext(i -> LOG.info("> " + i))
.subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...
所有日志在一个线程中输出:
[ parallel-1] d.a.Application : > 0
[ parallel-1] d.a.Application : -> 0
[ parallel-1] d.a.Application : > 1
[ parallel-1] d.a.Application : -> 1
[ parallel-1] d.a.Application : > 2
[ parallel-1] d.a.Application : -> 2
[ parallel-1] d.a.Application : > 3
[ parallel-1] d.a.Application : -> 3
[ parallel-1] d.a.Application : > 4
[ parallel-1] d.a.Application : -> 4
[ parallel-1] d.a.Application : > 5
[ parallel-1] d.a.Application : -> 5
[ parallel-1] d.a.Application : > 6
[ parallel-1] d.a.Application : -> 6
[ parallel-1] d.a.Application : > 7
[ parallel-1] d.a.Application : -> 7
[ parallel-1] d.a.Application : > 8
[ parallel-1] d.a.Application : -> 8
[ parallel-1] d.a.Application : > 9
[ parallel-1] d.a.Application : -> 9
如何强制在多线程中处理?
我试过在 Flux
上使用 .parallel().runOn()
,但这只会使获取数据并行化,但实际处理仍在一个线程上 运行s。
我也在 Flux
上试过 .publishOn(Schedulers.parallel())
但没有效果。
并且将 ExecutorChannel
或带有执行程序的 Poller
添加到处理程序也没有帮助。
这有一些技巧:
.channel(new FluxMessageChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(message -> LOG.info(" -> " + message.getPayload())))
FluxMessageChannel
使用的那些消息将与其他 ExecutorChannel
并行使用。
我认为您要问的是使上述 FluxMessageChannel
可配置的功能请求。这样的subscribeOn/publishOn
等可以在那里配置。
欢迎就此事提出 JIRA!