ForkJoinPool 大小动态增加?
ForkJoinPool size increasing dynamically?
相关:
我正在研究通过 parallelStream 和 CompletableFutures 并行化网络调用的不同方式。因此,我遇到过 java 的 parallelStream 使用的 ForkJoinPool.commonPool() 的大小动态增长的情况,从 ~ #Cores 到最大值 64。
Java 详情:
$ java -version
openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)
显示此类行为的代码如下(完整的可执行代码 here)
public static int loops = 100;
private static long sleepTimeMs = 1000;
private static ExecutorService customPool = Executors.newFixedThreadPool(loops);
// this method shows dynamic increase in pool size
public static void m1() {
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
.parallel()
.map(number -> CompletableFuture.supplyAsync(
() -> DummyProcess.slowNetworkCall(number), customPool))
.map(CompletableFuture::join)
.mapToLong(Long::longValue)
.summaryStatistics();
}
// this method shows static pool size
public static void m2() {
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops)
.parallel()
.map(DummyProcess::slowNetworkCall) // in this call, parallelism/poolsize stays constant 11
.summaryStatistics();
}
public static Long slowNetworkCall(Long i) {
Instant start = Instant.now();
// starts with 11 (#cores in my laptop = 12), goes upto 64
log.info(" {} going to sleep. poolsize: {}", i, ForkJoinPool.commonPool().getPoolSize());
try {
TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(" {} woke up..", i);
return Duration.between(start, Instant.now()).toMillis();
}
示例输出:
16:07:17.443 [pool-2-thread-7] INFO generalworks.parallelism.DummyProcess - 44 going to sleep. poolsize: 11
16:07:17.443 [pool-2-thread-9] INFO generalworks.parallelism.DummyProcess - 7 going to sleep. poolsize: 12
16:07:17.443 [pool-2-thread-4] INFO generalworks.parallelism.DummyProcess - 6 going to sleep. poolsize: 12
16:07:17.444 [pool-2-thread-13] INFO generalworks.parallelism.DummyProcess - 82 going to sleep. poolsize: 13
16:07:17.444 [pool-2-thread-14] INFO generalworks.parallelism.DummyProcess - 26 going to sleep. poolsize: 14
16:07:17.444 [pool-2-thread-15] INFO generalworks.parallelism.DummyProcess - 96 going to sleep. poolsize: 15
16:07:17.445 [pool-2-thread-16] INFO generalworks.parallelism.DummyProcess - 78 going to sleep. poolsize: 16
.
.
16:07:18.460 [pool-2-thread-79] INFO generalworks.parallelism.DummyProcess - 2 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-71] INFO generalworks.parallelism.DummyProcess - 36 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-74] INFO generalworks.parallelism.DummyProcess - 77 going to sleep. poolsize: 64
16:07:18.461 [pool-2-thread-83] INFO generalworks.parallelism.DummyProcess - 86 going to sleep. poolsize: 64
我知道公共池中的线程数,即 parallelism
是基于可用核心的最大数量,所以由于我的笔记本电脑有 12 个核心,我得到的并行度为 11 .但是我不明白为什么它在一种方法中一直在爬升,而在另一种方法中,它的大小保持不变
我相信你的答案是 here(ForkJoinPool
实现):
if ((wt = q.owner) != null &&
((ts = wt.getState()) == Thread.State.BLOCKED ||
ts == Thread.State.WAITING))
++bc; // worker is blocking
在你的代码的一个版本中,你在 Thread.sleep
上阻塞,这将线程置于 TIMED_WAITING
状态,而在另一个版本中,你在 CompletableFuture.join()
上阻塞,这使它进入 WAITING
状态。该实现区分了这些并展示了您观察到的不同行为。
CompletableFuture
里面也有特殊的代码,让它和ForkJoinPool
配合,防止在等待结果时饿死:
if (Thread.currentThread() instanceof ForkJoinWorkerThread)
ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
与您首先测试它的原因相关的结论:Thread.sleep()
没有正确模拟长网络调用。如果您执行了实际操作或其他一些阻塞操作,它将通过扩展池进行补偿。
提供的答案完全正确。根据 ForJoinPool::getPoolSize
的文档:
The result returned by this method may differ from getParallelism()
when threads are created to maintain parallelism when others are cooperatively blocked.
如您所见,sleep
不算作 合作阻止(我想这有点道理)。您可以阅读某种相关的
相关:
我正在研究通过 parallelStream 和 CompletableFutures 并行化网络调用的不同方式。因此,我遇到过 java 的 parallelStream 使用的 ForkJoinPool.commonPool() 的大小动态增长的情况,从 ~ #Cores 到最大值 64。
Java 详情:
$ java -version
openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)
显示此类行为的代码如下(完整的可执行代码 here)
public static int loops = 100;
private static long sleepTimeMs = 1000;
private static ExecutorService customPool = Executors.newFixedThreadPool(loops);
// this method shows dynamic increase in pool size
public static void m1() {
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
.parallel()
.map(number -> CompletableFuture.supplyAsync(
() -> DummyProcess.slowNetworkCall(number), customPool))
.map(CompletableFuture::join)
.mapToLong(Long::longValue)
.summaryStatistics();
}
// this method shows static pool size
public static void m2() {
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops)
.parallel()
.map(DummyProcess::slowNetworkCall) // in this call, parallelism/poolsize stays constant 11
.summaryStatistics();
}
public static Long slowNetworkCall(Long i) {
Instant start = Instant.now();
// starts with 11 (#cores in my laptop = 12), goes upto 64
log.info(" {} going to sleep. poolsize: {}", i, ForkJoinPool.commonPool().getPoolSize());
try {
TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(" {} woke up..", i);
return Duration.between(start, Instant.now()).toMillis();
}
示例输出:
16:07:17.443 [pool-2-thread-7] INFO generalworks.parallelism.DummyProcess - 44 going to sleep. poolsize: 11
16:07:17.443 [pool-2-thread-9] INFO generalworks.parallelism.DummyProcess - 7 going to sleep. poolsize: 12
16:07:17.443 [pool-2-thread-4] INFO generalworks.parallelism.DummyProcess - 6 going to sleep. poolsize: 12
16:07:17.444 [pool-2-thread-13] INFO generalworks.parallelism.DummyProcess - 82 going to sleep. poolsize: 13
16:07:17.444 [pool-2-thread-14] INFO generalworks.parallelism.DummyProcess - 26 going to sleep. poolsize: 14
16:07:17.444 [pool-2-thread-15] INFO generalworks.parallelism.DummyProcess - 96 going to sleep. poolsize: 15
16:07:17.445 [pool-2-thread-16] INFO generalworks.parallelism.DummyProcess - 78 going to sleep. poolsize: 16
.
.
16:07:18.460 [pool-2-thread-79] INFO generalworks.parallelism.DummyProcess - 2 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-71] INFO generalworks.parallelism.DummyProcess - 36 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-74] INFO generalworks.parallelism.DummyProcess - 77 going to sleep. poolsize: 64
16:07:18.461 [pool-2-thread-83] INFO generalworks.parallelism.DummyProcess - 86 going to sleep. poolsize: 64
我知道公共池中的线程数,即 parallelism
是基于可用核心的最大数量,所以由于我的笔记本电脑有 12 个核心,我得到的并行度为 11 .但是我不明白为什么它在一种方法中一直在爬升,而在另一种方法中,它的大小保持不变
我相信你的答案是 here(ForkJoinPool
实现):
if ((wt = q.owner) != null &&
((ts = wt.getState()) == Thread.State.BLOCKED ||
ts == Thread.State.WAITING))
++bc; // worker is blocking
在你的代码的一个版本中,你在 Thread.sleep
上阻塞,这将线程置于 TIMED_WAITING
状态,而在另一个版本中,你在 CompletableFuture.join()
上阻塞,这使它进入 WAITING
状态。该实现区分了这些并展示了您观察到的不同行为。
CompletableFuture
里面也有特殊的代码,让它和ForkJoinPool
配合,防止在等待结果时饿死:
if (Thread.currentThread() instanceof ForkJoinWorkerThread)
ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
与您首先测试它的原因相关的结论:Thread.sleep()
没有正确模拟长网络调用。如果您执行了实际操作或其他一些阻塞操作,它将通过扩展池进行补偿。
提供的答案完全正确。根据 ForJoinPool::getPoolSize
的文档:
The result returned by this method may differ from
getParallelism()
when threads are created to maintain parallelism when others are cooperatively blocked.
如您所见,sleep
不算作 合作阻止(我想这有点道理)。您可以阅读某种相关的