Spring Integration 5.0 + Project Reactor:控制线程

Spring Integration 5.0 + Project Reactor: controlling threads

的跟进问题

在使用 Flux + split() + FluxMessageChannel.

时,我无法在并行线程中将集成处理程序设为 运行

考虑以下片段:

// ...
.handle(message -> Flux.range(0, 10)
    .doOnNext(i -> LOG.info("> " + i))
    .subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...

所有日志在一个线程中输出:

[     parallel-1] d.a.Application    : > 0
[     parallel-1] d.a.Application    :  -> 0
[     parallel-1] d.a.Application    : > 1
[     parallel-1] d.a.Application    :  -> 1
[     parallel-1] d.a.Application    : > 2
[     parallel-1] d.a.Application    :  -> 2
[     parallel-1] d.a.Application    : > 3
[     parallel-1] d.a.Application    :  -> 3
[     parallel-1] d.a.Application    : > 4
[     parallel-1] d.a.Application    :  -> 4
[     parallel-1] d.a.Application    : > 5
[     parallel-1] d.a.Application    :  -> 5
[     parallel-1] d.a.Application    : > 6
[     parallel-1] d.a.Application    :  -> 6
[     parallel-1] d.a.Application    : > 7
[     parallel-1] d.a.Application    :  -> 7
[     parallel-1] d.a.Application    : > 8
[     parallel-1] d.a.Application    :  -> 8
[     parallel-1] d.a.Application    : > 9
[     parallel-1] d.a.Application    :  -> 9

如何强制在多线程中处理?

我试过在 Flux 上使用 .parallel().runOn(),但这只会使获取数据并行化,但实际处理仍在一个线程上 运行s。

我也在 Flux 上试过 .publishOn(Schedulers.parallel()) 但没有效果。

并且将 ExecutorChannel 或带有执行程序的 Poller 添加到处理程序也没有帮助。

这有一些技巧:

.channel(new FluxMessageChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(message -> LOG.info(" -> " + message.getPayload())))

FluxMessageChannel 使用的那些消息将与其他 ExecutorChannel 并行使用。

我认为您要问的是使上述 FluxMessageChannel 可配置的功能请求。这样的subscribeOn/publishOn等可以在那里配置。

欢迎就此事提出 JIRA