等待 ExecutorService.execute 中的所有任务完成

Wait for all tasks in ExecutorService.execute to complete

使用
提交时如何等待所有任务完成 ExecutorService.execute()。有一个函数叫做 awaitTermination
但是必须在其中提供超时。这不能保证
returns 所有的任务都已经完成了。有办法实现吗?

execute 方法没有 return 任何东西。您可以使用 submit 方法,该方法 return 是 Future 类型的结果。

Future<String> future = 
  executorService.submit(callableTask/runnableTask);

如果使用class ThreadPoolExecutor or any of its children you have a method there getActiveCount() 即returns 正在主动执行任务的线程数。因此,您可以轮询该方法,直到它变为 0,这意味着所有任务都已完成并且当前没有新任务正在执行。但是,如果某些任务卡住了怎么办?我认为在这种情况下你还必须给出一些超时以防止无限循环。

这个思路最大的好处就是不需要调用shutdown方法

如果您阅读 ExecutorService.awaitTermination 的 javadoc(或查看方法签名),您将看到它 returns 和 boolean。此布尔值指示 Executor 是否终止。您可以使用该信息创建一个 while 循环以确定它是否已终止。

ExecutorService executor = ...

executor.shutdown(); // close the executor and don't accept new tasks

while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS) {}

像这样的事情会停止执行器并等待它终止并且所有任务都完成。

有几种方法。

你可以先调用 ExecutorService.shutdown and then ExecutorService.awaitTermination which returns:

true if this executor terminated and false if the timeout elapsed before termination

所以:

There is a function called awaitTermination But a timeout has to be provided in it. Which is not a guarantee that when this returns all the tasks would have been finished. Is there a way to achieve this ?

您只需循环调用 awaitTermination

使用 awaitTermination

此实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }

        int count = 0;

        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }

    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}

使用 CountDownLatch

另一种选择是创建一个 CountDownLatch,其中 count 等于并行任务的数量。每个线程调用 countDownLatch.countDown();,而 main 线程调用 countDownLatch.await();.

此实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}

使用循环屏障

另一种方法是使用 Cyclic Barrier

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}

还有其他方法,但这些方法需要更改您的初始要求,即:

How to wait for all tasks to be completed when they are submitted using ExecutorService.execute() .