等待 ExecutorService.execute 中的所有任务完成
Wait for all tasks in ExecutorService.execute to complete
使用
提交时如何等待所有任务完成
ExecutorService.execute()。有一个函数叫做 awaitTermination
但是必须在其中提供超时。这不能保证
returns 所有的任务都已经完成了。有办法实现吗?
execute 方法没有 return 任何东西。您可以使用 submit 方法,该方法 return 是 Future 类型的结果。
Future<String> future =
executorService.submit(callableTask/runnableTask);
如果使用class ThreadPoolExecutor or any of its children you have a method there getActiveCount() 即returns 正在主动执行任务的线程数。因此,您可以轮询该方法,直到它变为 0,这意味着所有任务都已完成并且当前没有新任务正在执行。但是,如果某些任务卡住了怎么办?我认为在这种情况下你还必须给出一些超时以防止无限循环。
这个思路最大的好处就是不需要调用shutdown方法
如果您阅读 ExecutorService.awaitTermination
的 javadoc(或查看方法签名),您将看到它 returns 和 boolean
。此布尔值指示 Executor
是否终止。您可以使用该信息创建一个 while
循环以确定它是否已终止。
ExecutorService executor = ...
executor.shutdown(); // close the executor and don't accept new tasks
while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS) {}
像这样的事情会停止执行器并等待它终止并且所有任务都完成。
有几种方法。
你可以先调用 ExecutorService.shutdown and then ExecutorService.awaitTermination which returns:
true if this executor terminated and false if the timeout elapsed
before termination
所以:
There is a function called awaitTermination But a timeout has to be
provided in it. Which is not a guarantee that when this returns all
the tasks would have been finished. Is there a way to achieve this ?
您只需循环调用 awaitTermination
。
使用 awaitTermination
此实现的完整示例:
public class WaitForAllToEnd {
public static void main(String[] args) throws InterruptedException {
final int total_threads = 4;
ExecutorService executor = Executors.newFixedThreadPool(total_threads);
for(int i = 0; i < total_threads; i++){
executor.execute(parallelWork(100 + i * 100));
}
int count = 0;
// This is the relevant part
// Chose the delay most appropriate for your use case
executor.shutdown();
while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
System.out.println("Waiting "+ count);
count++;
}
}
private static Runnable parallelWork(long sleepMillis) {
return () -> {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
// Do Something
}
System.out.println("I am Thread : " + Thread.currentThread().getId());
};
}
}
使用 CountDownLatch
另一种选择是创建一个 CountDownLatch,其中 count
等于并行任务的数量。每个线程调用 countDownLatch.countDown();
,而 main 线程调用 countDownLatch.await();
.
此实现的完整示例:
public class WaitForAllToEnd {
public static void main(String[] args) throws InterruptedException {
final int total_threads = 4;
CountDownLatch countDownLatch = new CountDownLatch(total_threads);
ExecutorService executor = Executors.newFixedThreadPool(total_threads);
for(int i = 0; i < total_threads; i++){
executor.execute(parallelWork(100 + i * 100, countDownLatch));
}
countDownLatch.await();
System.out.println("Exit");
executor.shutdown();
}
private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
return () -> {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
// Do Something
}
System.out.println("I am Thread : " + Thread.currentThread().getId());
countDownLatch.countDown();
};
}
}
使用循环屏障
另一种方法是使用 Cyclic Barrier
public class WaitForAllToEnd {
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
final int total_threads = 4;
CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
ExecutorService executor = Executors.newFixedThreadPool(total_threads);
for(int i = 0; i < total_threads; i++){
executor.execute(parallelWork(100 + i * 100, barrier));
}
barrier.await();
System.out.println("Exit");
executor.shutdown();
}
private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
return () -> {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
// Do Something
}
System.out.println("I am Thread : " + Thread.currentThread().getId());
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// Do something
}
};
}
}
还有其他方法,但这些方法需要更改您的初始要求,即:
How to wait for all tasks to be completed when they are submitted
using ExecutorService.execute() .
使用
提交时如何等待所有任务完成
ExecutorService.execute()。有一个函数叫做 awaitTermination
但是必须在其中提供超时。这不能保证
returns 所有的任务都已经完成了。有办法实现吗?
execute 方法没有 return 任何东西。您可以使用 submit 方法,该方法 return 是 Future 类型的结果。
Future<String> future =
executorService.submit(callableTask/runnableTask);
如果使用class ThreadPoolExecutor or any of its children you have a method there getActiveCount() 即returns 正在主动执行任务的线程数。因此,您可以轮询该方法,直到它变为 0,这意味着所有任务都已完成并且当前没有新任务正在执行。但是,如果某些任务卡住了怎么办?我认为在这种情况下你还必须给出一些超时以防止无限循环。
这个思路最大的好处就是不需要调用shutdown方法
如果您阅读 ExecutorService.awaitTermination
的 javadoc(或查看方法签名),您将看到它 returns 和 boolean
。此布尔值指示 Executor
是否终止。您可以使用该信息创建一个 while
循环以确定它是否已终止。
ExecutorService executor = ...
executor.shutdown(); // close the executor and don't accept new tasks
while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS) {}
像这样的事情会停止执行器并等待它终止并且所有任务都完成。
有几种方法。
你可以先调用 ExecutorService.shutdown and then ExecutorService.awaitTermination which returns:
true if this executor terminated and false if the timeout elapsed before termination
所以:
There is a function called awaitTermination But a timeout has to be provided in it. Which is not a guarantee that when this returns all the tasks would have been finished. Is there a way to achieve this ?
您只需循环调用 awaitTermination
。
使用 awaitTermination
此实现的完整示例:
public class WaitForAllToEnd {
public static void main(String[] args) throws InterruptedException {
final int total_threads = 4;
ExecutorService executor = Executors.newFixedThreadPool(total_threads);
for(int i = 0; i < total_threads; i++){
executor.execute(parallelWork(100 + i * 100));
}
int count = 0;
// This is the relevant part
// Chose the delay most appropriate for your use case
executor.shutdown();
while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
System.out.println("Waiting "+ count);
count++;
}
}
private static Runnable parallelWork(long sleepMillis) {
return () -> {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
// Do Something
}
System.out.println("I am Thread : " + Thread.currentThread().getId());
};
}
}
使用 CountDownLatch
另一种选择是创建一个 CountDownLatch,其中 count
等于并行任务的数量。每个线程调用 countDownLatch.countDown();
,而 main 线程调用 countDownLatch.await();
.
此实现的完整示例:
public class WaitForAllToEnd {
public static void main(String[] args) throws InterruptedException {
final int total_threads = 4;
CountDownLatch countDownLatch = new CountDownLatch(total_threads);
ExecutorService executor = Executors.newFixedThreadPool(total_threads);
for(int i = 0; i < total_threads; i++){
executor.execute(parallelWork(100 + i * 100, countDownLatch));
}
countDownLatch.await();
System.out.println("Exit");
executor.shutdown();
}
private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
return () -> {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
// Do Something
}
System.out.println("I am Thread : " + Thread.currentThread().getId());
countDownLatch.countDown();
};
}
}
使用循环屏障
另一种方法是使用 Cyclic Barrier
public class WaitForAllToEnd {
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
final int total_threads = 4;
CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
ExecutorService executor = Executors.newFixedThreadPool(total_threads);
for(int i = 0; i < total_threads; i++){
executor.execute(parallelWork(100 + i * 100, barrier));
}
barrier.await();
System.out.println("Exit");
executor.shutdown();
}
private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
return () -> {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
// Do Something
}
System.out.println("I am Thread : " + Thread.currentThread().getId());
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// Do something
}
};
}
}
还有其他方法,但这些方法需要更改您的初始要求,即:
How to wait for all tasks to be completed when they are submitted using ExecutorService.execute() .