Java parallelstream 在使用 newCachedThreadPool() 时未使用最佳线程数

Java parallelstream not using optimal number of threads when using newCachedThreadPool()

我做了两个独立的数据库并行读取实现。 第一个实现是使用 ExecutorServicenewCachedThreadPool() 构造函数和 Futures:我简单地调用 returns 每个读取案例的未来,然后在我完成所有调用后我调用 get() 在他们。此实现工作正常并且速度足够快。

第二种实现是使用并行流。当我将并行流调用放入同一个 ExecutorService 池时,它的工作速度几乎 5 倍慢 并且它似乎没有使用我希望的那么多线程。当我改为将它放入 ForkJoinPool pool = new ForkJoinPool(50) 时,它的运行速度与之前的实现一样快。

我的问题是:

为什么并行流在 newCachedThreadPool 版本中未充分利用线程?

这是第二个实现的代码(我没有发布第一个实现,因为无论如何它都能正常工作):

private static final ExecutorService pool = Executors.newCachedThreadPool();

final List<AbstractMap.SimpleImmutableEntry<String, String>> simpleImmutableEntryStream =
                personIdList.stream().flatMap(
                        personId -> movieIdList.stream().map(
                                movieId -> new AbstractMap.SimpleImmutableEntry<>(personId, movieId))).collect(Collectors.toList());

final Future<Map<String, List<Summary>>> futureMovieSummaryForPerson = pool.submit(() -> {
      final Stream<Summary> summaryStream = simpleImmutableEntryStream.parallelStream().map(
            inputPair -> {
                   return FeedbackDao.find(inputPair.getKey(), inputPair.getValue());
            }).filter(Objects::nonNull);
return summaryStream.collect(Collectors.groupingBy(Summary::getPersonId));
});

这与 ForkJoinTask.fork 的实现方式有关,如果当前线程来自 ForkJoinPool 它将使用相同的池来提交新任务,但如果不是,它将使用公共池使用本地计算机中的处理器总数以及此处当您使用 Executors.newCachedThreadPool() 创建池时,该池创建的线程不会被识别为来自 ForkJoinPool,因此它使用公共池。

下面是它的实现方式,应该可以帮助您更好地理解:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

Executors.newCachedThreadPool() 创建的线程将不是 ForkJoinWorkerThread 类型,因此它将使用池大小未优化的公共池来提交新任务。