ForkJoinPool#awaitQuiescence 实际上是如何工作的?
How does ForkJoinPool#awaitQuiescence actually work?
我有 RecursiveAction
的下一个实现,这个 class 的唯一目的是从 0 打印到 9,但如果可能的话,从不同的线程打印:
public class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
new MyRecursiveAction(num + 1).fork();
}
}
}
而且我认为调用 awaitQuiescence
将使当前线程等待所有任务(提交和分叉)完成:
public class Main {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(new MyRecursiveAction(0));
System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
}
}
但我并不总是得到正确的结果,而不是打印 10 次,打印 0 到 10 次。
但是如果我将 helpQuiesce
添加到 RecursiveAction
的实现中:
public class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
new MyRecursiveAction(num + 1).fork();
}
RecursiveAction.helpQuiesce();//here
}
}
一切正常。
我想知道实际上 awaitQuiescence
等待什么?
您了解将 System.out.println(num);
更改为 System.out.println(num + " " + Thread.currentThread());
时会发生什么
这可能会打印出如下内容:
0 Thread[ForkJoinPool-1-worker-3,5,main]
1 Thread[main,5,main]
tasks
2 Thread[ForkJoinPool.commonPool-worker-3,5,main]
当awaitQuiescence
检测到有待处理的任务时,它会帮忙偷一个并直接执行。 Its documentation 说:
If called by a ForkJoinTask operating in this pool, equivalent in effect to ForkJoinTask.helpQuiesce(). Otherwise, waits and/or attempts to assist performing tasks until this pool isQuiescent() or the indicated timeout elapses.
强调是我加的
这发生在这里,正如我们所见,一个任务打印“main”作为它的执行线程。然后,fork()
的行为被指定为:
Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool()
if not inForkJoinPool()
.
由于main
线程不是ForkJoinPool
的工作线程,fork()
将向commonPool()
提交新任务。从那时起,从公共池的工作线程调用的 fork()
也会将下一个任务提交到公共池。但是在自定义池上调用的 awaitQuiescence
不会等待公共池的任务完成,因此 JVM 过早终止。
如果你要说这是一个有缺陷的API设计,我不会反对。
解决方案是不将 awaitQuiescence
用于公共池之外的任何地方¹。通常,拆分子任务的 RecursiveAction
应该等待它们完成。然后,您可以等待根任务完成以等待所有关联任务完成。
的后半部分包含此类 RecursiveAction
实现的示例。
¹ awaitQuiescence
当您没有实际的期货时很有用,例如提交到公共池的并行流。
Everything works fine.
不,它没有,你很幸运,它在你插入时起作用了:
RecursiveAction.helpQuiesce();
为了解释这一点,让我们稍微改变一下您的示例:
static class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
new MyRecursiveAction(num + 1).fork();
}
}
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(new MyRecursiveAction(0));
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
}
如果你 运行 这个,你会发现你得到了你期望得到的结果。这有两个主要原因。首先,fork
方法将执行 common pool
中的任务,正如其他答案已经解释的那样。其次,公共池中的线程是 daemon 线程。 JVM 在退出之前并没有等待它们完成,它很早就存在了。因此,如果是这种情况,您可能会问它为什么有效。这是因为这条线:
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
这会使 main
线程(非守护线程)休眠两秒钟,从而为 ForkJoinPool
提供足够的时间来执行您的任务。
现在让我们将代码更改为更接近您的示例:
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(new MyRecursiveAction(0));
System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
}
具体来说,您使用:forkJoinPool.awaitQuiescence(...)
,记录为:
Otherwise, waits and/or attempts to assist performing tasks...
它不是说它必然等待,它说它会“等待and/or attempt ...”,在这种情况下,它比 和 更多 或 。因此,它将尝试提供帮助,但仍然不会等待所有任务完成。这很奇怪甚至很愚蠢吗?
当您插入 RecursiveAction.helpQuiesce();
时,您最终会在幕后调用相同的 awaitQuiescence
(使用不同的参数)——因此基本上没有任何变化;根本问题依然存在:
static ForkJoinPool forkJoinPool = new ForkJoinPool();
static AtomicInteger res = new AtomicInteger(0);
public static void main(String[] args) {
forkJoinPool.execute(new MyRecursiveAction(0));
System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
System.out.println(res.get());
}
static class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10_000) {
res.incrementAndGet();
System.out.println(num + " thread : " + Thread.currentThread().getName());
new MyRecursiveAction(num + 1).fork();
}
RecursiveAction.helpQuiesce();
}
}
当我 运行 这样做时,它从未打印 10000
,表明该行的插入没有任何改变。
处理此类事情的通常默认方式是 fork
然后 join
。调用者中还有一个 join
,在调用 submit
时返回的 ForkJoinTask
上。类似于:
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
ForkJoinTask<Void> task = forkJoinPool.submit(new MyRecursiveAction(0));
task.join();
}
static class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
MyRecursiveAction ac = new MyRecursiveAction(num + 1);
ac.fork();
ac.join();
}
}
}
我有 RecursiveAction
的下一个实现,这个 class 的唯一目的是从 0 打印到 9,但如果可能的话,从不同的线程打印:
public class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
new MyRecursiveAction(num + 1).fork();
}
}
}
而且我认为调用 awaitQuiescence
将使当前线程等待所有任务(提交和分叉)完成:
public class Main {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(new MyRecursiveAction(0));
System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
}
}
但我并不总是得到正确的结果,而不是打印 10 次,打印 0 到 10 次。
但是如果我将 helpQuiesce
添加到 RecursiveAction
的实现中:
public class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
new MyRecursiveAction(num + 1).fork();
}
RecursiveAction.helpQuiesce();//here
}
}
一切正常。
我想知道实际上 awaitQuiescence
等待什么?
您了解将 System.out.println(num);
更改为 System.out.println(num + " " + Thread.currentThread());
这可能会打印出如下内容:
0 Thread[ForkJoinPool-1-worker-3,5,main]
1 Thread[main,5,main]
tasks
2 Thread[ForkJoinPool.commonPool-worker-3,5,main]
当awaitQuiescence
检测到有待处理的任务时,它会帮忙偷一个并直接执行。 Its documentation 说:
If called by a ForkJoinTask operating in this pool, equivalent in effect to ForkJoinTask.helpQuiesce(). Otherwise, waits and/or attempts to assist performing tasks until this pool isQuiescent() or the indicated timeout elapses.
强调是我加的
这发生在这里,正如我们所见,一个任务打印“main”作为它的执行线程。然后,fork()
的行为被指定为:
Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the
ForkJoinPool.commonPool()
if notinForkJoinPool()
.
由于main
线程不是ForkJoinPool
的工作线程,fork()
将向commonPool()
提交新任务。从那时起,从公共池的工作线程调用的 fork()
也会将下一个任务提交到公共池。但是在自定义池上调用的 awaitQuiescence
不会等待公共池的任务完成,因此 JVM 过早终止。
如果你要说这是一个有缺陷的API设计,我不会反对。
解决方案是不将 awaitQuiescence
用于公共池之外的任何地方¹。通常,拆分子任务的 RecursiveAction
应该等待它们完成。然后,您可以等待根任务完成以等待所有关联任务完成。
RecursiveAction
实现的示例。
¹ awaitQuiescence
当您没有实际的期货时很有用,例如提交到公共池的并行流。
Everything works fine.
不,它没有,你很幸运,它在你插入时起作用了:
RecursiveAction.helpQuiesce();
为了解释这一点,让我们稍微改变一下您的示例:
static class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
new MyRecursiveAction(num + 1).fork();
}
}
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(new MyRecursiveAction(0));
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
}
如果你 运行 这个,你会发现你得到了你期望得到的结果。这有两个主要原因。首先,fork
方法将执行 common pool
中的任务,正如其他答案已经解释的那样。其次,公共池中的线程是 daemon 线程。 JVM 在退出之前并没有等待它们完成,它很早就存在了。因此,如果是这种情况,您可能会问它为什么有效。这是因为这条线:
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
这会使 main
线程(非守护线程)休眠两秒钟,从而为 ForkJoinPool
提供足够的时间来执行您的任务。
现在让我们将代码更改为更接近您的示例:
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(new MyRecursiveAction(0));
System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
}
具体来说,您使用:forkJoinPool.awaitQuiescence(...)
,记录为:
Otherwise, waits and/or attempts to assist performing tasks...
它不是说它必然等待,它说它会“等待and/or attempt ...”,在这种情况下,它比 和 更多 或 。因此,它将尝试提供帮助,但仍然不会等待所有任务完成。这很奇怪甚至很愚蠢吗?
当您插入 RecursiveAction.helpQuiesce();
时,您最终会在幕后调用相同的 awaitQuiescence
(使用不同的参数)——因此基本上没有任何变化;根本问题依然存在:
static ForkJoinPool forkJoinPool = new ForkJoinPool();
static AtomicInteger res = new AtomicInteger(0);
public static void main(String[] args) {
forkJoinPool.execute(new MyRecursiveAction(0));
System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
System.out.println(res.get());
}
static class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10_000) {
res.incrementAndGet();
System.out.println(num + " thread : " + Thread.currentThread().getName());
new MyRecursiveAction(num + 1).fork();
}
RecursiveAction.helpQuiesce();
}
}
当我 运行 这样做时,它从未打印 10000
,表明该行的插入没有任何改变。
处理此类事情的通常默认方式是 fork
然后 join
。调用者中还有一个 join
,在调用 submit
时返回的 ForkJoinTask
上。类似于:
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
ForkJoinTask<Void> task = forkJoinPool.submit(new MyRecursiveAction(0));
task.join();
}
static class MyRecursiveAction extends RecursiveAction {
private final int num;
public MyRecursiveAction(int num) {
this.num = num;
}
@Override
protected void compute() {
if (num < 10) {
System.out.println(num);
MyRecursiveAction ac = new MyRecursiveAction(num + 1);
ac.fork();
ac.join();
}
}
}