当当前正在执行的线程进入等待状态时,为什么 runOn() 方法不在池中的下一个可用线程上执行映射运算符?
Why does not the runOn() method execute map operator on the next available thread from the pool when the current executing threads go to wait state?
我正在尝试在 4 核机器上执行以下代码。我在池中和映射运算符中有 5 个线程,我让执行线程休眠几秒钟。
我预计核心会将正在执行的线程置于睡眠状态,并且当下一个事件可用时,应该对线程池中的下一个可用线程执行映射操作,但这不是我看到的行为。
我看到池中的 4 个线程继续等待 13 秒,只有在等待完成后才处理下一个事件。
当线程进入等待状态时,为什么 runOn()
方法不在池中的下一个可用线程上执行 map 运算符?
我正在使用 reactor-core 版本 '3.0.7.RELEASE'
CountDownLatch latch = new CountDownLatch(10);
ExecutorService executorService = Executors.newFixedThreadPool(5);
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
flux.parallel()
.runOn(Schedulers.fromExecutorService(executorService))
.map(l -> {
Logger.log(ReactorParallelTest.class, "map1", "inside run waiting for 13 seconds");
try {
Thread.sleep(13000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Logger.log(ReactorParallelTest.class, "map1", "l=" + l);
latch.countDown();
return l;
}).subscribe(l -> {
Logger.log(ReactorParallelTest.class, "onNext", "l=" + l);
}, error -> System.err.println(error),
() -> {
Logger.log(ReactorParallelTest.class, "onComplete", "inside complete.");
executorService.shutdown();
});
try {
latch.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
您正在使用此代码阻止所有 rails。 4 rails 将被启动(CPU 数量),它们将立即从源中各请求 1 个元素。因为你在完成后立即阻塞 map
,rail 不能从上游请求更多,所以实际上你一次只能得到 4 个元素,阻塞,获取更多,阻塞......并行性更受限制比线程池的容量。如果你想充分利用所有线程,请执行.parallel(5)
(与线程池相同的配置)。
附带说明一下,ParallelFlux
中的 subscribe(lambda)
将为每个轨道调用 onComplete
回调。如果要合并回单个序列(并且单个完整),请在 .subscribe
.
之前使用 .sequential()
我正在尝试在 4 核机器上执行以下代码。我在池中和映射运算符中有 5 个线程,我让执行线程休眠几秒钟。
我预计核心会将正在执行的线程置于睡眠状态,并且当下一个事件可用时,应该对线程池中的下一个可用线程执行映射操作,但这不是我看到的行为。
我看到池中的 4 个线程继续等待 13 秒,只有在等待完成后才处理下一个事件。
当线程进入等待状态时,为什么 runOn()
方法不在池中的下一个可用线程上执行 map 运算符?
我正在使用 reactor-core 版本 '3.0.7.RELEASE'
CountDownLatch latch = new CountDownLatch(10);
ExecutorService executorService = Executors.newFixedThreadPool(5);
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
flux.parallel()
.runOn(Schedulers.fromExecutorService(executorService))
.map(l -> {
Logger.log(ReactorParallelTest.class, "map1", "inside run waiting for 13 seconds");
try {
Thread.sleep(13000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Logger.log(ReactorParallelTest.class, "map1", "l=" + l);
latch.countDown();
return l;
}).subscribe(l -> {
Logger.log(ReactorParallelTest.class, "onNext", "l=" + l);
}, error -> System.err.println(error),
() -> {
Logger.log(ReactorParallelTest.class, "onComplete", "inside complete.");
executorService.shutdown();
});
try {
latch.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
您正在使用此代码阻止所有 rails。 4 rails 将被启动(CPU 数量),它们将立即从源中各请求 1 个元素。因为你在完成后立即阻塞 map
,rail 不能从上游请求更多,所以实际上你一次只能得到 4 个元素,阻塞,获取更多,阻塞......并行性更受限制比线程池的容量。如果你想充分利用所有线程,请执行.parallel(5)
(与线程池相同的配置)。
附带说明一下,ParallelFlux
中的 subscribe(lambda)
将为每个轨道调用 onComplete
回调。如果要合并回单个序列(并且单个完整),请在 .subscribe
.
.sequential()