如何检查所有任务是否已完成(正常或突然)?
How can I check if all tasks have been completed (normally or abruptly)?
我有以下 class:
public class Service{
private static final ExecutorService executor = Executors.newFixedThreadPool(4);
public synchronized void execute(Collection<Runnable> runs){
for(Runnable r: runs){
executor.execute(r);
}
}
public boolean isComplete(){
//should return true iff all tasks applied by the execute(Collection<Runnable> runs)
//method're finished their execution (normally or abruptly,
//it doesn matter)
}
}
如何实现 isComplete()
方法。我需要检查当前是否有正在进行的任务。如果执行器被清除(所有任务都完成)那么该方法应该 return true,否则 return false.
鉴于您正在使用 ExecutorService
,您可以使用 submit
而不是 execute
,并将返回的 Future
存储在列表中。然后在 isComplete()
中遍历所有 Future
并在每个 isDone()
上调用。 (如果需要,此方法还可以让您通过 Future
s 取消提交的任务)。
例如† :
class Service{
private static final ExecutorService executor = Executors.newFixedThreadPool(4);
private List<Future<?>> futures;
public void execute(Collection<Runnable> runs){
futures = runs.stream()
.map(r -> executor.submit(r))
.collect(Collectors.toList());
}
public boolean isComplete(){
for (Future<?> future : futures)
if (!future.isDone()) return false;
return true;
}
}
根据您的用例,您可以通过从 futures
列表中删除项目来获得更好的性能,但您(可能)需要同步 isComplete
方法:
public synchronized boolean isComplete(){
Iterator<Future<?>> itr = futures.iterator();
while (itr.hasNext()) {
if (itr.next().isDone()) itr.remove();
else return false;
}
return true;
}
† 编写此代码示例时,它假定您只会对 [=23= 的每个实例调用一次 execute
],所以它不需要是 synchronized
。如果您将在每个 Service
实例上有多个并发调用者 execute
,请注意,这将在每次调用 execute
时替换 futures
列表。您可以通过使 class 仅供一次性使用或附加到期货来处理。这完全取决于您的用例。
您可以调用shutdown()
方法来要求执行器终止所有线程并关闭池。然后调用 isTerminated()
如果所有线程都终止,它将 return 为真。
如果你想阻塞执行,你可以使用awaitTerminationMethod()
,最后shutDownNow()
不管线程是否完成执行都会终止池。
CountDownLatch 解决了我的问题。
ExecutorService WORKER_THREAD_POOL
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// ...
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// wait for the latch to be decremented by the two remaining threads
latch.await();
我有以下 class:
public class Service{
private static final ExecutorService executor = Executors.newFixedThreadPool(4);
public synchronized void execute(Collection<Runnable> runs){
for(Runnable r: runs){
executor.execute(r);
}
}
public boolean isComplete(){
//should return true iff all tasks applied by the execute(Collection<Runnable> runs)
//method're finished their execution (normally or abruptly,
//it doesn matter)
}
}
如何实现 isComplete()
方法。我需要检查当前是否有正在进行的任务。如果执行器被清除(所有任务都完成)那么该方法应该 return true,否则 return false.
鉴于您正在使用 ExecutorService
,您可以使用 submit
而不是 execute
,并将返回的 Future
存储在列表中。然后在 isComplete()
中遍历所有 Future
并在每个 isDone()
上调用。 (如果需要,此方法还可以让您通过 Future
s 取消提交的任务)。
例如† :
class Service{
private static final ExecutorService executor = Executors.newFixedThreadPool(4);
private List<Future<?>> futures;
public void execute(Collection<Runnable> runs){
futures = runs.stream()
.map(r -> executor.submit(r))
.collect(Collectors.toList());
}
public boolean isComplete(){
for (Future<?> future : futures)
if (!future.isDone()) return false;
return true;
}
}
根据您的用例,您可以通过从 futures
列表中删除项目来获得更好的性能,但您(可能)需要同步 isComplete
方法:
public synchronized boolean isComplete(){
Iterator<Future<?>> itr = futures.iterator();
while (itr.hasNext()) {
if (itr.next().isDone()) itr.remove();
else return false;
}
return true;
}
† 编写此代码示例时,它假定您只会对 [=23= 的每个实例调用一次 execute
],所以它不需要是 synchronized
。如果您将在每个 Service
实例上有多个并发调用者 execute
,请注意,这将在每次调用 execute
时替换 futures
列表。您可以通过使 class 仅供一次性使用或附加到期货来处理。这完全取决于您的用例。
您可以调用shutdown()
方法来要求执行器终止所有线程并关闭池。然后调用 isTerminated()
如果所有线程都终止,它将 return 为真。
如果你想阻塞执行,你可以使用awaitTerminationMethod()
,最后shutDownNow()
不管线程是否完成执行都会终止池。
CountDownLatch 解决了我的问题。
ExecutorService WORKER_THREAD_POOL
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// ...
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// wait for the latch to be decremented by the two remaining threads
latch.await();