ForkJoinPool 大小动态增加?

ForkJoinPool size increasing dynamically?

相关:

我正在研究通过 parallelStream 和 CompletableFutures 并行化网络调用的不同方式。因此,我遇到过 java 的 parallelStream 使用的 ForkJoinPool.commonPool() 的大小动态增长的情况,从 ~ #Cores 到最大值 64。

Java 详情: $ java -version

openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)

显示此类行为的代码如下(完整的可执行代码 here


    public static int loops = 100;
    private static long sleepTimeMs = 1000;
    private static ExecutorService customPool = Executors.newFixedThreadPool(loops);




    // this method shows dynamic increase in pool size
    public static void m1() {
        Instant start = Instant.now();
        LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
                .parallel()
                .map(number -> CompletableFuture.supplyAsync(
                        () -> DummyProcess.slowNetworkCall(number), customPool))
                .map(CompletableFuture::join)
                .mapToLong(Long::longValue)
                .summaryStatistics();

    }

    // this method shows static pool size
    public static void m2() {
        Instant start = Instant.now();
        LongSummaryStatistics stats = LongStream.range(0, loops)
                .parallel()
                .map(DummyProcess::slowNetworkCall) // in this call, parallelism/poolsize stays constant 11
                .summaryStatistics();
    }


    public static Long slowNetworkCall(Long i) {
        Instant start = Instant.now();
        // starts with 11 (#cores in my laptop = 12), goes upto 64
        log.info(" {} going to sleep. poolsize: {}", i, ForkJoinPool.commonPool().getPoolSize());
        try {
            TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info(" {} woke up..", i);
        return Duration.between(start, Instant.now()).toMillis();
    }

示例输出:

16:07:17.443 [pool-2-thread-7] INFO  generalworks.parallelism.DummyProcess -  44 going to sleep. poolsize: 11
16:07:17.443 [pool-2-thread-9] INFO  generalworks.parallelism.DummyProcess -  7 going to sleep. poolsize: 12
16:07:17.443 [pool-2-thread-4] INFO  generalworks.parallelism.DummyProcess -  6 going to sleep. poolsize: 12
16:07:17.444 [pool-2-thread-13] INFO  generalworks.parallelism.DummyProcess -  82 going to sleep. poolsize: 13
16:07:17.444 [pool-2-thread-14] INFO  generalworks.parallelism.DummyProcess -  26 going to sleep. poolsize: 14
16:07:17.444 [pool-2-thread-15] INFO  generalworks.parallelism.DummyProcess -  96 going to sleep. poolsize: 15
16:07:17.445 [pool-2-thread-16] INFO  generalworks.parallelism.DummyProcess -  78 going to sleep. poolsize: 16
.
.
16:07:18.460 [pool-2-thread-79] INFO  generalworks.parallelism.DummyProcess -  2 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-71] INFO  generalworks.parallelism.DummyProcess -  36 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-74] INFO  generalworks.parallelism.DummyProcess -  77 going to sleep. poolsize: 64
16:07:18.461 [pool-2-thread-83] INFO  generalworks.parallelism.DummyProcess -  86 going to sleep. poolsize: 64

我知道公共池中的线程数,即 parallelism 是基于可用核心的最大数量,所以由于我的笔记本电脑有 12 个核心,我得到的并行度为 11 .但是我不明白为什么它在一种方法中一直在爬升,而在另一种方法中,它的大小保持不变

我相信你的答案是 hereForkJoinPool 实现):

                        if ((wt = q.owner) != null &&
                            ((ts = wt.getState()) == Thread.State.BLOCKED ||
                             ts == Thread.State.WAITING))
                            ++bc;            // worker is blocking

在你的代码的一个版本中,你在 Thread.sleep 上阻塞,这将线程置于 TIMED_WAITING 状态,而在另一个版本中,你在 CompletableFuture.join() 上阻塞,这使它进入 WAITING 状态。该实现区分了这些并展示了您观察到的不同行为。

CompletableFuture里面也有特殊的代码,让它和ForkJoinPool配合,防止在等待结果时饿死:

            if (Thread.currentThread() instanceof ForkJoinWorkerThread)
                ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);

与您首先测试它的原因相关的结论:Thread.sleep() 没有正确模拟长网络调用。如果您执行了实际操作或其他一些阻塞操作,它将通过扩展池进行补偿。

提供的答案完全正确。根据 ForJoinPool::getPoolSize 的文档:

The result returned by this method may differ from getParallelism() when threads are created to maintain parallelism when others are cooperatively blocked.

如您所见,sleep 不算作 合作阻止(我想这有点道理)。您可以阅读某种相关的