并行流"know"如何使用封闭的ForkJoinPool?
How does parallel stream "know" to use the enclosing ForkJoinPool?
在 Java 8 中,可以设置一个自定义的 forkJoinPool 以供并行流而不是公共池使用。
forkJoinPool.submit(() -> list.parallelStream().forEach(x ->{...} ))
我的问题是它在技术上是如何发生的?
流不会以任何方式意识到它已提交到自定义 forkJoinpool 并且无法直接访问它。那么最终如何使用正确的线程来处理流的任务呢?
我尝试查看源代码但无济于事。我最好的猜测是在提交时的某个时刻设置了一些 threadLocal 变量,然后由流稍后使用。如果是这样,为什么语言开发人员会选择这种方式来实现行为,而不是将池注入到流中?
谢谢!
java.util.stream.ForEachOps.ForEachOp#evaluateParallel
方法调用 invoke()
:
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
依次调用 java.util.concurrent.ForkJoinTask#doInvoke
:
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
如上述方法所示,它使用Thread.currentThread()
找到当前线程。
然后它使用 .pool
字段,如 (wt = (ForkJoinWorkerThread)t).pool
中那样,它给出了当前线程在 运行 中的当前池:
public class ForkJoinWorkerThread extends Thread {
final ForkJoinPool pool; // the pool this thread works in
根据我阅读的代码,仅 根据触发计算的初始线程做出决定,在方法 ForkJoinTask::fork
内,字面意思是检查哪个线程触发了这个(也在它的文档中):
Thread.currentThread()) instanceof ForkJoinWorkerThread
因此,如果 ForkJoinWorkerThread
的一个实例启动了这个(这是您将通过自定义 ForkJoinPool
获得的),请使用池中已经存在的任何内容,并且此任务 运行 ;否则(如果它是 不是 ForkJoinWorkerThread
的实例 的不同线程)使用:
ForkJoinPool.common.externalPush(this);
同样有趣的是,ForkJoinWorkerThread
实际上是一个 public
class,因此您可以在它的一个实例中开始计算,但仍然使用不同的池;虽然我没有试过这个。
在 Java 8 中,可以设置一个自定义的 forkJoinPool 以供并行流而不是公共池使用。
forkJoinPool.submit(() -> list.parallelStream().forEach(x ->{...} ))
我的问题是它在技术上是如何发生的?
流不会以任何方式意识到它已提交到自定义 forkJoinpool 并且无法直接访问它。那么最终如何使用正确的线程来处理流的任务呢?
我尝试查看源代码但无济于事。我最好的猜测是在提交时的某个时刻设置了一些 threadLocal 变量,然后由流稍后使用。如果是这样,为什么语言开发人员会选择这种方式来实现行为,而不是将池注入到流中?
谢谢!
java.util.stream.ForEachOps.ForEachOp#evaluateParallel
方法调用 invoke()
:
@Override public <S> Void evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator) { if (ordered) new ForEachOrderedTask<>(helper, spliterator, this).invoke(); else new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke(); return null; }
依次调用 java.util.concurrent.ForkJoinTask#doInvoke
:
private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); }
如上述方法所示,它使用Thread.currentThread()
找到当前线程。
然后它使用 .pool
字段,如 (wt = (ForkJoinWorkerThread)t).pool
中那样,它给出了当前线程在 运行 中的当前池:
public class ForkJoinWorkerThread extends Thread { final ForkJoinPool pool; // the pool this thread works in
根据我阅读的代码,仅 根据触发计算的初始线程做出决定,在方法 ForkJoinTask::fork
内,字面意思是检查哪个线程触发了这个(也在它的文档中):
Thread.currentThread()) instanceof ForkJoinWorkerThread
因此,如果 ForkJoinWorkerThread
的一个实例启动了这个(这是您将通过自定义 ForkJoinPool
获得的),请使用池中已经存在的任何内容,并且此任务 运行 ;否则(如果它是 不是 ForkJoinWorkerThread
的实例 的不同线程)使用:
ForkJoinPool.common.externalPush(this);
同样有趣的是,ForkJoinWorkerThread
实际上是一个 public
class,因此您可以在它的一个实例中开始计算,但仍然使用不同的池;虽然我没有试过这个。