并行流"know"如何使用封闭的ForkJoinPool?

How does parallel stream "know" to use the enclosing ForkJoinPool?

在 Java 8 中,可以设置一个自定义的 forkJoinPool 以供并行流而不是公共池使用。

forkJoinPool.submit(() -> list.parallelStream().forEach(x ->{...} ))

我的问题是它在技术上是如何发生的?
流不会以任何方式意识到它已提交到自定义 forkJoinpool 并且无法直接访问它。那么最终如何使用正确的线程来处理流的任务呢?

我尝试查看源代码但无济于事。我最好的猜测是在提交时的某个时刻设置了一些 threadLocal 变量,然后由流稍后使用。如果是这样,为什么语言开发人员会选择这种方式来实现行为,而不是将池注入到流中?

谢谢!

java.util.stream.ForEachOps.ForEachOp#evaluateParallel 方法调用 invoke():

@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
                                 Spliterator<S> spliterator) {
    if (ordered)
        new ForEachOrderedTask<>(helper, spliterator, this).invoke();
    else
        new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
    return null;
}

依次调用 java.util.concurrent.ForkJoinTask#doInvoke:

private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    return (s = doExec()) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this, 0L) :
        externalAwaitDone();
}

如上述方法所示,它使用Thread.currentThread()找到当前线程。

然后它使用 .pool 字段,如 (wt = (ForkJoinWorkerThread)t).pool 中那样,它给出了当前线程在 运行 中的当前池:

public class ForkJoinWorkerThread extends Thread {

    final ForkJoinPool pool;                // the pool this thread works in

根据我阅读的代码, 根据触发计算的初始线程做出决定,在方法 ForkJoinTask::fork 内,字面意思是检查哪个线程触发了这个(也在它的文档中):

Thread.currentThread()) instanceof ForkJoinWorkerThread

因此,如果 ForkJoinWorkerThread 的一个实例启动了这个(这是您将通过自定义 ForkJoinPool 获得的),请使用池中已经存在的任何内容,并且此任务 运行 ;否则(如果它是 不是 ForkJoinWorkerThread 的实例 的不同线程)使用:

ForkJoinPool.common.externalPush(this); 

同样有趣的是,ForkJoinWorkerThread 实际上是一个 public class,因此您可以在它的一个实例中开始计算,但仍然使用不同的池;虽然我没有试过这个。