什么会导致 ExecutorService.invokeAll() 抛出 InterruptedException?

What can cause ExecutorService.invokeAll() to throw an InterruptedException?

javadoc 说 invokeAll(Collection<> callables) 抛出

InterruptedException - if interrupted while waiting, in which case unfinished tasks are cancelled

但是没有关于为什么通话会被中断的文档。我的程序会这样做 - 但很少见,而且我无法编写会导致它发生的测试。

我正在使用没有超时的单参数方法。

public class ParallelUtil<T> {

    public interface Task<T> {
        public void execute(T data) throws Exception;
    }

    public void execute(final Task<T> task, Collection<T> targets) {
        ExecutorService executor = null;
        try {
            executor = Executors.newFixedThreadPool(20);
            execute(task, targets, executor);
        } finally {
            if (executor != null) {
                try {
                    executor.shutdown();
                } catch (Exception e) {}
            }
        }
    }

    private void execute(final Task<T> task, Collection<T> targets, ExecutorService executor) {
        List<Callable<Void>> tasks = new ArrayList<>();
        ...
        try {
            executor.invokeAll(tasks);
        } catch (Exception e) {
            // Here we get InterruptedException - for no reason?
            // With some of the tasks left undone!
        }
    }
}

InterruptedException 从 java.util.concurrent.FutureTask.awaitDone(FutureTask.java:40‌​0)

抛出(至少在某些情况下)

可以同时有很多这样的 ParallelUtils 运行(从不同的线程启动),但是正如您所看到的,每个调用都会创建自己的 ExecutorService,所以它们不应该互相惹事。

(作为一个附带问题,我可以为所有调用使用共享池而不让 invokeAll 调用相互混淆吗?)

What can cause ExecutorService.invokeAll() to throw an InterruptedException?

查看 shutdownNow() 的 javadoc:

There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate.

由于 invokeAll 等到所有任务都完成后,从另一个线程调用 shutdownNow() 是中断 invokeAll 调用的一种方式。


但是,在代码中 正如您向我们展示的那样,没有对 shutdownNow() 的调用,也没有明显的方式让执行程序服务对象泄漏到另一个线程。

您的代码(或其他库代码)中的某些内容也可能正在调用 Thread.interrupt。例如,其中一项任务可能正在对自己执行此操作。

There can be many of these ParallelUtils running at the same time (launched from different threads)

显然,这些线程中至少有一个直接通过 Thread.interrupt() 或通过例如间接中断。 Future.cancel(true)ExecutorService.shutdownNow().

重现此中断的示例:

class Sleep implements ParallelUtil.Task<Integer> {
  @Override
  public void execute(Integer data) throws Exception {
    Thread.sleep(Long.MAX_VALUE);
  }
}

class InterruptInvokeAll {

  public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    executorService.submit(
        () -> {
          ParallelUtil<Integer> parallelUtil = new ParallelUtil<>();
          parallelUtil.execute(new Sleep(), Arrays.asList(1));
        });

    executorService.shutdownNow(); // indirectly interrupts thread that calls executor.invokeAll
   }    
}