ForkJoinPool#awaitQuiescence 实际上是如何工作的?

How does ForkJoinPool#awaitQuiescence actually work?

我有 RecursiveAction 的下一个实现,这个 class 的唯一目的是从 0 打印到 9,但如果可能的话,从不同的线程打印:

public class MyRecursiveAction extends RecursiveAction {
    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            new MyRecursiveAction(num + 1).fork();
        }
    }
}

而且我认为调用 awaitQuiescence 将使当前线程等待所有任务(提交和分叉)完成:

public class Main {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.execute(new MyRecursiveAction(0));
        System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
    }
}

但我并不总是得到正确的结果,而不是打印 10 次,打印 0 到 10 次。

但是如果我将 helpQuiesce 添加到 RecursiveAction 的实现中:

public class MyRecursiveAction extends RecursiveAction {
    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            new MyRecursiveAction(num + 1).fork();
        }

        RecursiveAction.helpQuiesce();//here
    }
}

一切正常。

我想知道实际上 awaitQuiescence 等待什么?

您了解将 System.out.println(num); 更改为 System.out.println(num + " " + Thread.currentThread());

时会发生什么

这可能会打印出如下内容:

0 Thread[ForkJoinPool-1-worker-3,5,main]
1 Thread[main,5,main]
tasks
2 Thread[ForkJoinPool.commonPool-worker-3,5,main]

awaitQuiescence检测到有待处理的任务时,它会帮忙偷一个并直接执行。 Its documentation 说:

If called by a ForkJoinTask operating in this pool, equivalent in effect to ForkJoinTask.helpQuiesce(). Otherwise, waits and/or attempts to assist performing tasks until this pool isQuiescent() or the indicated timeout elapses.

强调是我加的

这发生在这里,正如我们所见,一个任务打印“main”作为它的执行线程。然后,fork() 的行为被指定为:

Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool().

由于main 线程不是ForkJoinPool 的工作线程,fork() 将向commonPool() 提交新任务。从那时起,从公共池的工作线程调用的 fork() 也会将下一个任务提交到公共池。但是在自定义池上调用的 awaitQuiescence 不会等待公共池的任务完成,因此 JVM 过早终止。

如果你要说这是一个有缺陷的API设计,我不会反对。

解决方案是不将 awaitQuiescence 用于公共池之外的任何地方¹。通常,拆分子任务的 RecursiveAction 应该等待它们完成。然后,您可以等待根任务完成以等待所有关联任务完成。

的后半部分包含此类 RecursiveAction 实现的示例。

¹ awaitQuiescence 当您没有实际的期货时很有用,例如提交到公共池的并行流。

Everything works fine.

不,它没有,你很幸运,它在你插入时起作用了:

RecursiveAction.helpQuiesce();

为了解释这一点,让我们稍微改变一下您的示例:

static class MyRecursiveAction extends RecursiveAction {

    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            new MyRecursiveAction(num + 1).fork();
        }
    }

}


public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    forkJoinPool.execute(new MyRecursiveAction(0));
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
}

如果你 运行 这个,你会发现你得到了你期望得到的结果。这有两个主要原因。首先,fork 方法将执行 common pool 中的任务,正如其他答案已经解释的那样。其次,公共池中的线程是 daemon 线程。 JVM 在退出之前并没有等待它们完成,它很早就存在了。因此,如果是这种情况,您可能会问它为什么有效。这是因为这条线:

LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));

这会使 main 线程(非守护线程)休眠两秒钟,从而为 ForkJoinPool 提供足够的时间来执行您的任务。

现在让我们将代码更改为更接近您的示例:

public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    forkJoinPool.execute(new MyRecursiveAction(0));
    System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
}

具体来说,您使用:forkJoinPool.awaitQuiescence(...),记录为:

Otherwise, waits and/or attempts to assist performing tasks...

不是说它必然等待,它说它会“等待and/or attempt ...”,在这种情况下,它比 更多 。因此,它将尝试提供帮助,但仍然不会等待所有任务完成。这很奇怪甚至很愚蠢吗?

当您插入 RecursiveAction.helpQuiesce(); 时,您最终会在幕后调用相同的 awaitQuiescence(使用不同的参数)——因此基本上没有任何变化;根本问题依然存在:

static ForkJoinPool forkJoinPool = new ForkJoinPool();
static AtomicInteger res = new AtomicInteger(0);

public static void main(String[] args) {
    forkJoinPool.execute(new MyRecursiveAction(0));
    System.out.println(forkJoinPool.awaitQuiescence(5, TimeUnit.SECONDS) ? "tasks" : "time");
    System.out.println(res.get());
}

static class MyRecursiveAction extends RecursiveAction {

    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10_000) {
            res.incrementAndGet();
            System.out.println(num + " thread : " + Thread.currentThread().getName());
            new MyRecursiveAction(num + 1).fork();
        }

        RecursiveAction.helpQuiesce();

    }

}

当我 运行 这样做时,它从未打印 10000,表明该行的插入没有任何改变。


处理此类事情的通常默认方式是 fork 然后 join。调用者中还有一个 join,在调用 submit 时返回的 ForkJoinTask 上。类似于:

public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool(2);
    ForkJoinTask<Void> task = forkJoinPool.submit(new MyRecursiveAction(0));
    task.join();
}

static class MyRecursiveAction extends RecursiveAction {
    private final int num;

    public MyRecursiveAction(int num) {
        this.num = num;
    }

    @Override
    protected void compute() {
        if (num < 10) {
            System.out.println(num);
            MyRecursiveAction ac = new MyRecursiveAction(num + 1);
            ac.fork();
            ac.join();
        }
    }
}