当当前正在执行的线程进入等待状态时,为什么 runOn() 方法不在池中的下一个可用线程上执行映射运算符?

Why does not the runOn() method execute map operator on the next available thread from the pool when the current executing threads go to wait state?

我正在尝试在 4 核机器上执行以下代码。我在池中和映射运算符中有 5 个线程,我让执行线程休眠几秒钟。

我预计核心会将正在执行的线程置于睡眠状态,并且当下一个事件可用时,应该对线程池中的下一个可用线程执行映射操作,但这不是我看到的行为。

我看到池中的 4 个线程继续等待 13 秒,只有在等待完成后才处理下一个事件。

当线程进入等待状态时,为什么 runOn() 方法不在池中的下一个可用线程上执行 map 运算符?

我正在使用 reactor-core 版本 '3.0.7.RELEASE'

CountDownLatch latch = new CountDownLatch(10);

ExecutorService executorService = Executors.newFixedThreadPool(5);

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
flux.parallel()
.runOn(Schedulers.fromExecutorService(executorService))
.map(l -> {
    Logger.log(ReactorParallelTest.class, "map1", "inside run waiting for 13 seconds");
    try {
        Thread.sleep(13000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Logger.log(ReactorParallelTest.class, "map1", "l=" + l);
    latch.countDown();
    return l;
}).subscribe(l -> {
    Logger.log(ReactorParallelTest.class, "onNext", "l=" + l);
}, error -> System.err.println(error),
        () -> {
            Logger.log(ReactorParallelTest.class, "onComplete", "inside complete.");
            executorService.shutdown();
        });

try {
    latch.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

您正在使用此代码阻止所有 rails。 4 rails 将被启动(CPU 数量),它们将立即从源中各请求 1 个元素。因为你在完成后立即阻塞 map,rail 不能从上游请求更多,所以实际上你一次只能得到 4 个元素,阻塞,获取更多,阻塞......并行性更受限制比线程池的容量。如果你想充分利用所有线程,请执行.parallel(5)(与线程池相同的配置)。

附带说明一下,ParallelFlux 中的 subscribe(lambda) 将为每个轨道调用 onComplete 回调。如果要合并回单个序列(并且单个完整),请在 .subscribe.

之前使用 .sequential()