ForkJoinPool.invoke() 和 ForkJoinTask.invoke() 或 compute()

ForkJoinPool.invoke() and ForkJoinTask.invoke() or compute()

我正在阅读 Java ForkJoin 框架。不直接在 ForkJoinTask 的实现上调用 invoke()(例如 RecursiveTask),而是实例化 ForkJoinPool 并调用 pool.invoke(task) 有什么额外的好处?当我们将这 2 个方法都称为 invoke 时究竟发生了什么?

从源头看来,如果调用 recursiveTask.invoke,它将以托管线程池的方式调用其 exec 并最终调用 compute。因此,更令人困惑的是为什么我们有成语 pool.invoke(task).

我写了一些简单的代码来测试性能差异,但我没有看到任何差异。也许测试代码是错误的?见下文:

public class MyForkJoinTask extends RecursiveAction {

    private static int totalWorkInMillis = 20000;
    protected static int sThreshold = 1000;

    private int workInMillis;


    public MyForkJoinTask(int work) {
        this.workInMillis = work;
    }

    // Average pixels from source, write results into destination.
    protected void computeDirectly() {
        try {

            ForkJoinTask<Object> objectForkJoinTask = new ForkJoinTask<>();
            Thread.sleep(workInMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void compute() {
        if (workInMillis < sThreshold) {
            computeDirectly();
            return;
        }

        int discountedWork = (int) (workInMillis * 0.9);
        int split = discountedWork / 2;

        invokeAll(new MyForkJoinTask(split),
                new MyForkJoinTask(split));
    }

    public static void main(String[] args) throws Exception {
        System.out.printf("Total work is %d in millis.%n", totalWorkInMillis);
        System.out.printf("Threshold is %d in millis.%n", sThreshold);

        int processors = Runtime.getRuntime().availableProcessors();
        System.out.println(Integer.toString(processors) + " processor"
                + (processors != 1 ? "s are " : " is ")
                + "available");

        MyForkJoinTask fb = new MyForkJoinTask(totalWorkInMillis);

        ForkJoinPool pool = new ForkJoinPool();

        long startTime = System.currentTimeMillis();


        // These 2 seems no difference!
        pool.invoke(fb);
//        fb.compute();


        long endTime = System.currentTimeMillis();

        System.out.println("Took " + (endTime - startTime) +
                " milliseconds.");
    }
}

RecursiveTaskclass的compute()方法只是一个包含任务代码的抽象方法。它不使用池中的新线程,如果您正常调用它,它不会 运行 在池管理的线程中。

fork 连接池上的 invoke 方法向池提交任务,然后在单独的线程上启动 运行ning,调用该线程上的 compute 方法,然后等待结果。

您可以在 java 文档中针对 RecursiveTask 和 ForkJoinPool 的措辞中看到这一点。 invoke() 方法实际执行任务,而 compute() 方法只是封装了计算。

protected abstract V compute()

The main computation performed by this task.

和 ForkJoinPool

public <T> T invoke(ForkJoinTask<T> task)

Performs the given task, returning its result upon completion. ...

因此,对于计算方法,您正在做的是 运行 在 fork 连接池之外对 compute 的第一次调用。您可以通过在计算方法中添加日志行来对此进行测试。

System.out.println(this.inForkJoinPool());

您还可以通过记录线程 ID

来检查它是否在同一线程中 运行
System.out.println(Thread.currentThread().getId());

一旦您调用 invokeAll,该调用中包含的子任务就会 运行 在一个池中。但请注意,它不一定是您在调用 compute() 之前创建的池中的 运行。您可以注释掉您的 new ForkJoinPool() 代码,它仍然会 运行。有趣的是,java 7 文档说如果在池管理线程之外调用 invokeAll() 方法将抛出异常,但 java 8 文档不会。请注意,我还没有在 java 中测试过 7(仅 8)。但很可能,您的代码在 java 7.

中直接调用 compute() 时会抛出异常

两个结果同时返回的原因是毫秒不够准确,无法记录在池管理线程中启动第一个线程的差异,或者只是 运行 宁第一个 compute 在现有线程中调用。

Sierra 和 Bates 的 OCA/OCP 学习指南建议您使用 fork join 框架的方法是从池中调用 invoke()。它明确了你使用的是哪个池,也意味着你可以将多个任务提交到同一个池中,这样就节省了每次重新创建新池的开销。从逻辑上讲,将所有任务计算保存在池管理的线程中(或者至少我认为是)。

pool.invoke() 调用在特定池上调用;当首次调用 task.invoketask.invokeAll 时,而不是让框架自行创建。这意味着您可以将池重新用于新任务,并在创建池时指定活动线程数等内容。这就是区别。将这些日志行添加到您的代码中,尝试一下,您就会看到它在做什么