核心计数任务 CompletableFuture 比 parallelStream 慢

core-count tasks CompletableFuture slower than parallelStream

我的电脑是四核的(仅供参考)

CompletableFuture 将使用 ForkJoinPool.commonPool(),因为其 official doc 指出:

All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).

我从CompletableFuture.supplyAsync(Supplier<U> supplier)

调试出来如下代码
private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);

/**
 * Default executor -- ForkJoinPool.commonPool() unless it cannot
 * support parallelism.
 */
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

这意味着 parallelStream 总是使用 ForkJoinPool.commonPool(),但这里是它更快的原因。

我试着把它们打印出来,发现使用 CompletableFuture 时只有 三个 个线程:

private static int concurrencyGet() {
    List<CompletableFuture<Integer>> futureList = IntStream.rangeClosed(0, 10).boxed()
   .map(i -> CompletableFuture.supplyAsync(() -> getNumber(i)))
            .collect(Collectors.toList());
    return futureList.stream().map(future -> future.join()).reduce(0, Integer::sum);
}

但是 parallelStream 使用 four 包括 main 线程。

我的猜测是,在 CompletableFuture.supplyAsync() 中,ForkJoinPool.getCommonPoolParallelism() 只有 三个 而主线程占用四个之一,因为它是 异步

但是 parallelStream 将用完所有 four,因为它不是 asynchronous

这是正确的吗?我想知道有没有关于这个问题的一些官方文档?

感谢您的帮助。

以下是我从 Venkat Subramaniam关于 Parallel and Asynchronous Programming with Streams and CompletableFuture 的演讲中理解的:

由于 CompleteableFuture 也使用 ForkJoinPool.commonPool() 它也可以使用主线程,并且在某些情况下确实如此。

给出以下示例

public static void main(String[] args) {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> numberSupplier());
    future.thenAccept(i -> System.out.println("f: " + i + " - " + Thread.currentThread()));
    sleep(100); //wait for async operations to finish before exiting
}

private static Integer numberSupplier() {
    Integer n = 2;
    System.out.println("c: " + n + " - " + Thread.currentThread());
    sleep(19);
    return n;
}

private static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

您可能会得到这样的控制台输出:

c: 2 - Thread[ForkJoinPool.commonPool-worker-1,5,main]
f: 2 - Thread[ForkJoinPool.commonPool-worker-1,5,main]

supplyAsync(..)thenAccept(..) 部分都由 ForkJoinPool.

的工作线程执行

但是,如果给 supplyAsync(..)Supplier<Integer> 太快了,以至于在调用 thenAccept(..) 时就完成了,那么第二部分也可以在主线程:

private static Integer numberSupplier() {
    Integer n = 2;
    //System.out.println("c: " + n + " - " + Thread.currentThread());
    //sleep(19);
    return n;
}

输出:

f: 2 - Thread[main,5,main]