Java ForkJoinPool - 任务在队列中的顺序
Java ForkJoinPool - order of tasks in queues
我想了解 Java fork-join 池中任务的处理顺序。
到目前为止,我在文档中找到的唯一相关信息是关于一个名为 "asyncMode" 的参数,即 "true if this pool uses local first-in-first-out scheduling mode for forked tasks that are never joined".
我对这个说法的理解是每个worker都有自己的任务队列;工人从自己队列的前面接任务,或者如果他们自己的队列是空的,则从其他工人队列的后面偷走任务;如果 asyncMode 为 true(resp.false),工作人员将新分叉的任务添加到自己队列的后面(resp.front)。
理解有误请指正!
现在,这提出了几个问题:
1) 加入的分叉任务的顺序是什么?
我的猜测是,当任务被分叉时,它会按照我上面的解释被添加到工作人员的队列中。现在,假设任务加入...
如果在调用join时任务还没有开始,调用join的worker会把任务从队列中拉出来并立即开始处理。
如果在调用join时任务已经被其他worker抢走,那么调用join的worker会同时处理其他任务(按照我的获取任务的顺序)上面的解释),直到它加入的任务被偷走它的工人完成。
这个猜测是基于用打印语句编写简单的测试代码,并观察改变连接调用顺序影响任务处理顺序的方式。有人可以告诉我我的猜测是否正确吗?
2) 对外提交的任务顺序是什么?
根据 ,fork-join 池不使用外部队列。 (顺便说一下,我使用的是 Java 8。)
所以我是不是理解为当一个任务被外部提交时,任务被添加到一个随机选择的工作队列?
如果是,外部提交的任务是加到队尾还是队前?
最后,这个是看任务是调用pool.execute(task)提交还是调用pool.invoke(task)提交?这是否取决于调用 pool.execute(task) 或 pool.invoke(task) 的线程是外部线程还是此 fork-join 池中的线程?
- 你猜对了,你完全正确。
正如您在“Implementation overview”中看到的那样。
* Joining Tasks
* =============
*
* Any of several actions may be taken when one worker is waiting
* to join a task stolen (or always held) by another. Because we
* are multiplexing many tasks on to a pool of workers, we can't
* just let them block (as in Thread.join). We also cannot just
* reassign the joiner's run-time stack with another and replace
* it later, which would be a form of "continuation", that even if
* possible is not necessarily a good idea since we may need both
* an unblocked task and its continuation to progress. Instead we
* combine two tactics:
*
* Helping: Arranging for the joiner to execute some task that it
* would be running if the steal had not occurred.
*
* Compensating: Unless there are already enough live threads,
* method tryCompensate() may create or re-activate a spare
* thread to compensate for blocked joiners until they unblock.
2.Both ForkJoinPool.invoke和ForkJoinPool.join在提交任务的方式上是完全一样的。代码中可以看到
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
在 externalPush 中,您可以看到使用 ThreadLocalRandom 将任务添加到随机选择的工作队列中。而且,它是用push的方式进入到队头的。
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
我不知道你说的是什么意思:
And does this depend on whether the thread calling pool.execute(task) or pool.invoke(task) is an external thread or a thread within this fork-join pool?
我想了解 Java fork-join 池中任务的处理顺序。
到目前为止,我在文档中找到的唯一相关信息是关于一个名为 "asyncMode" 的参数,即 "true if this pool uses local first-in-first-out scheduling mode for forked tasks that are never joined".
我对这个说法的理解是每个worker都有自己的任务队列;工人从自己队列的前面接任务,或者如果他们自己的队列是空的,则从其他工人队列的后面偷走任务;如果 asyncMode 为 true(resp.false),工作人员将新分叉的任务添加到自己队列的后面(resp.front)。
理解有误请指正!
现在,这提出了几个问题:
1) 加入的分叉任务的顺序是什么?
我的猜测是,当任务被分叉时,它会按照我上面的解释被添加到工作人员的队列中。现在,假设任务加入...
如果在调用join时任务还没有开始,调用join的worker会把任务从队列中拉出来并立即开始处理。
如果在调用join时任务已经被其他worker抢走,那么调用join的worker会同时处理其他任务(按照我的获取任务的顺序)上面的解释),直到它加入的任务被偷走它的工人完成。
这个猜测是基于用打印语句编写简单的测试代码,并观察改变连接调用顺序影响任务处理顺序的方式。有人可以告诉我我的猜测是否正确吗?
2) 对外提交的任务顺序是什么?
根据
所以我是不是理解为当一个任务被外部提交时,任务被添加到一个随机选择的工作队列?
如果是,外部提交的任务是加到队尾还是队前?
最后,这个是看任务是调用pool.execute(task)提交还是调用pool.invoke(task)提交?这是否取决于调用 pool.execute(task) 或 pool.invoke(task) 的线程是外部线程还是此 fork-join 池中的线程?
- 你猜对了,你完全正确。 正如您在“Implementation overview”中看到的那样。
* Joining Tasks * ============= * * Any of several actions may be taken when one worker is waiting * to join a task stolen (or always held) by another. Because we * are multiplexing many tasks on to a pool of workers, we can't * just let them block (as in Thread.join). We also cannot just * reassign the joiner's run-time stack with another and replace * it later, which would be a form of "continuation", that even if * possible is not necessarily a good idea since we may need both * an unblocked task and its continuation to progress. Instead we * combine two tactics: * * Helping: Arranging for the joiner to execute some task that it * would be running if the steal had not occurred. * * Compensating: Unless there are already enough live threads, * method tryCompensate() may create or re-activate a spare * thread to compensate for blocked joiners until they unblock.
2.Both ForkJoinPool.invoke和ForkJoinPool.join在提交任务的方式上是完全一样的。代码中可以看到
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
在 externalPush 中,您可以看到使用 ThreadLocalRandom 将任务添加到随机选择的工作队列中。而且,它是用push的方式进入到队头的。
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
我不知道你说的是什么意思:
And does this depend on whether the thread calling pool.execute(task) or pool.invoke(task) is an external thread or a thread within this fork-join pool?