阻塞 I/O 任务的 ParallelFlux 与 flatMap()
ParallelFlux vs flatMap() for a Blocking I/O task
我有一个 Project Reactor 链,其中包含一个阻塞任务(一个网络调用,我们需要等待响应)。我想 运行 同时执行多个阻塞任务。
似乎可以使用 ParallelFlux 或 flatMap(),基本示例:
Flux.just(1)
.repeat(10)
.parallel(3)
.runOn(Schedulers.elastic())
.doOnNext(i -> blockingTask())
.sequential()
.subscribe()
或
Flux.just(1)
.repeat(10)
.flatMap(i -> Mono.fromCallable(() -> {blockingTask(); return i;}).subscribeOn(Schedulers.elastic()), 3)
.subscribe();
这两种技术的优点是什么?一个比另一个更受欢迎吗?还有其他选择吗?
parallel
专为性能目的的任务并行化而设计,并在 "rails" 或 "groups" 之间分派工作,每个任务都从 [=11] 获得自己的执行上下文=] 你传给 runOn
。简而言之,如果您进行 CPU 密集型工作,它将让您所有的 CPU 核心都投入工作。但是你正在做 I/O 绑定工作...
因此,在您的情况下,flatMap
是更好的选择。使用 flatMap
进行并行化更多的是关于编排。
如果您不算 flatMap
与 flatMapSequential
略有不同的风格(concatMap
确实不允许并行化),那么这些几乎就是 2 个备选方案.
我有一个 Project Reactor 链,其中包含一个阻塞任务(一个网络调用,我们需要等待响应)。我想 运行 同时执行多个阻塞任务。
似乎可以使用 ParallelFlux 或 flatMap(),基本示例:
Flux.just(1)
.repeat(10)
.parallel(3)
.runOn(Schedulers.elastic())
.doOnNext(i -> blockingTask())
.sequential()
.subscribe()
或
Flux.just(1)
.repeat(10)
.flatMap(i -> Mono.fromCallable(() -> {blockingTask(); return i;}).subscribeOn(Schedulers.elastic()), 3)
.subscribe();
这两种技术的优点是什么?一个比另一个更受欢迎吗?还有其他选择吗?
parallel
专为性能目的的任务并行化而设计,并在 "rails" 或 "groups" 之间分派工作,每个任务都从 [=11] 获得自己的执行上下文=] 你传给 runOn
。简而言之,如果您进行 CPU 密集型工作,它将让您所有的 CPU 核心都投入工作。但是你正在做 I/O 绑定工作...
因此,在您的情况下,flatMap
是更好的选择。使用 flatMap
进行并行化更多的是关于编排。
如果您不算 flatMap
与 flatMapSequential
略有不同的风格(concatMap
确实不允许并行化),那么这些几乎就是 2 个备选方案.