如何在 Java 的 ExecutorService 中检索和处理异常

How to retrieve and handle exceptions in Java's ExecutorService

我正在尝试找出一种在多线程设置中处理异常的方法。我想并行执行某些任务,每个任务都可能抛出一个我需要做出反应的异常(基本上,通过将失败的任务放回执行队列)。但是,从线程中实际获取异常的唯一方法似乎是创建一个 Future 并调用其 get() 方法。但是,这实质上将调用变成了同步调用。

也许一些代码可以说明这一点:

ExecutorService executor = Executors.newFixedThreadPool(nThreads);
Task task = taskQueue.poll(); // let's assume that task implements Runnable
try {
  executor.execute(task);
}
catch(Exception ex) {
  // record the failed task, so that it can be re-added to the queue 
} 

然而,在这种情况下,所有任务都已启动,但异常似乎并未在此 catch 块中捕获。

另一种方法是使用 Future 而不是线程并检索其结果:

try {
  Future<?> future = executor.submit(task);
  future.get();
}
...

在这种情况下,在 catch 块中可以很好地捕获异常,但代价是必须等到此操作完成。因此,任务按需要顺序执行,而不是并行执行。

我错过了什么?如何捕获每个任务的异常并对它们做出反应?

您可以在一个循环中触发所有任务,在另一个循环中 check/await/retry:

Map<Future<?>, Task> futures = new HashMap<Future<?>, Task>()
while(!taskQueue.isEmpty()){
    Task task = taskQueue.poll();
    Future<?> future = executor.submit(task);
    futures.put(future, task);
}

for(Map.Entry<Future<?>, Task> entry : futures.entrySet()){

    try {
        entry.getKey().get();
    }
    catch(ExecutionException ex) {
        // record the failed task, so that it can be re-added to the queue 
        // you should add a retry counter because you want to prevent endless loops
        taskQueue.add(entry.getValue());
    }
    catch(InterrupredException ex){ 
        // thread interrupted, exit
        Thread.interrupt();
        return;
    }
}

HTH,马克