ExecutorService.awaitTermination() 永远不会超时

ExecutorService.awaitTermination() never times out

我正在尝试实现一个函数,其中可调用对象在规定的时间内完成或操作超时。我曾希望 ExecutorService.awaitTermination() 会这样做,但很惊讶地发现它没有。代码如下。 运行 永远不会完成。

public class Counter implements Callable<Void> {

    public static void main(String[] args) throws InterruptedException {
        final Map<String, Counter> map = new HashMap<>();
        map.put("", new Counter());
        final Map<String, Future<Void>> result = executeTasksInParallel(map);
        final Future<Void> voidFuture = result.get("");
        try {
            voidFuture.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Void call() throws Exception {
        for (long i = 0L; i < Long.MAX_VALUE; i++);
        return null;
    }

    public static <K, V> Map<K, Future<V>> executeTasksInParallel(final Map<K, ? extends Callable<V>> callablesById) throws InterruptedException {
        final Map<K, Future<V>> resultFuturesById = new HashMap<>();
        final ExecutorService executorService = Executors.newFixedThreadPool(callablesById.size());
        for (final Map.Entry<K, ? extends Callable<V>> callableByIdEntry : callablesById.entrySet()) {
            final K id = callableByIdEntry.getKey();
            final Callable<V> callable = callableByIdEntry.getValue();
            final Future<V> resultFuture = executorService.submit(callable);
            resultFuturesById.put(id, resultFuture);
        }
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.SECONDS);
        return resultFuturesById;
    }
}

我是不是漏掉了什么?谢谢!

更新:

我尝试用下面的内容替换 try 块内容以避免 Future.get() 阻塞,但这也没有帮助

if (voidFuture.isDone()) {
   voidFuture.get();
}

awaintTermination 文档:

Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

它将在 5 秒后完成,但生成的线程仍在工作并且它不是守护线程,因此您的代码将继续工作直到子线程终止。

并且 voidFuture.get() 将阻塞直到返回。

awaitTermination() 不会尝试终止 运行 任务。 awaitTermination() 完成后,您应该调用 shutdownNow() 尝试杀死仍然存在的东西。

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdownNow()

  1. 按照 Joe C 指定的方式使用 shutdownNow()...
  2. ...但只有当您在 call() 中的代码允许它时,它才会起作用,例如通过检查当前线程是否被中断。参见例如this question and its answers for details. Occasionally you might go without this "cooperative" behavior in your loop if it calls (directly or indirectly) methods that handle interrupt requests properly by throwing an InterruptedException (examples are Thread.sleep(...), Object.wait(...), Future.get(...), blocking operations on a channel that implements InterruptibleChannel 等)。 编辑: ...如果抛出的 InterruptedException 没有被抑制。
  3. 是的,只有在未来 isDone() 时才调用 get()(因为它在不受您的 executorService 管理的主线程上)。

最终代码为

public class Counter implements Callable<Void> {

    public static void main(String[] args) throws InterruptedException {
        final Map<String, Counter> map = new HashMap<>();
        map.put("", new Counter());
        final Map<String, Future<Void>> result = executeTasksInParallel(map);
        final Future<Void> voidFuture = result.get("");
        try {
            if (voidFuture.isDone()) {
                voidFuture.get();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Void call() throws Exception {
        for (long i = 0L; i < Long.MAX_VALUE; i++) {
            if (Thread.currentThread().isInterrupted()) {
                Thread.currentThread().interrupt(); // restore interrupted flag
                return null;
            }
            /* or e.g. throw an exception */
        }
        return null;
    }

    public static <K, V> Map<K, Future<V>> executeTasksInParallel(
            final Map<K, ? extends Callable<V>> callablesById)
            throws InterruptedException {
        final Map<K, Future<V>> resultFuturesById = new HashMap<>();
        final ExecutorService executorService =
            Executors.newFixedThreadPool(callablesById.size());
        for (final Map.Entry<K, ? extends Callable<V>> callableByIdEntry : callablesById
            .entrySet()) {
            final K id = callableByIdEntry.getKey();
            final Callable<V> callable = callableByIdEntry.getValue();
            final Future<V> resultFuture = executorService.submit(callable);
            resultFuturesById.put(id, resultFuture);
        }
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.SECONDS);
        executorService.shutdownNow();
        return resultFuturesById;
    }
}