自己的ExecutorService用来创建CompletableFuture不会终止
Own ExecutorService used to create CompletableFuture does not terminate
我正在尝试使用我自己的 ExecutorService
创建一组 CompletableFuture
来链接多个流程步骤。这些步骤可能会引发异常。
当他们这样做时,在我看来 ExecutorService
中的线程并未释放,尽管我正在尝试处理这种情况。
class Scratch {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
AtomicInteger counter = new AtomicInteger();
Supplier<?> throwingException = () -> { throw new RuntimeException("throw " + counter.incrementAndGet()); };
Function<String, CompletableFuture<?>> process =
url -> CompletableFuture.supplyAsync(throwingException, executor)
.exceptionally(Scratch::log);
var collect = IntStream.range(1, 10).mapToObj(i -> "url" + i)
.map(process)
.toArray(CompletableFuture[]::new);
final CompletableFuture<Void> together = CompletableFuture.allOf(collect);
System.out.println("joining");
together.exceptionally(Scratch::log).join();
System.out.println("finished");
if (executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("exiting cleanly");
} else {
System.out.println("not terminated");
}
executor.submit(() -> System.out.println("still executing"));
}
static <T> T log(Throwable t) {
System.out.println(t.getMessage());
return null;
}
}
输出是
java.lang.RuntimeException: throw 1
joining
java.lang.RuntimeException: throw 2
java.lang.RuntimeException: throw 3
java.lang.RuntimeException: throw 4
java.lang.RuntimeException: throw 5
java.lang.RuntimeException: throw 6
java.lang.RuntimeException: throw 7
java.lang.RuntimeException: throw 8
java.lang.RuntimeException: throw 9
finished
not terminated
由此启动的进程也没有终止(这是我注意到的方式)。
在我看来,这应该意味着此时 ExecutorService
中没有剩余线程,但事实并非如此;如果我们降低线程池容量,它仍然会 运行 所有提交的任务,如果我们在失败终止后添加另一个提交(例如 executor.submit(() -> System.out.println("still executing"));
),它将被执行。
如果我们不将自己的 ExecutorService
传递给 CompletableFutre::supplyAsync
,进程将按预期终止。
我也尝试了其他版本的异常状态处理(比如使用 together.whenComplete()
),但结果相同。
为什么会发生这种情况,如何确保 ExecutorService
正确终止?
编辑:我意识到这不是导致问题的异常,这将发生在使用您自己的执行程序服务提供给 CompletableFuture
的任何任务中,考虑到 Eugene 的回复,这完全有意义。我正在更改问题标题。
这里发生了两件事。第一个是当你执行 而没有 显式 Executor
时,你的操作 运行 在 ForkJoinPool
中。该池使用守护线程,它不会停止 VM 退出。所以当你的 main
结束时,VM 就存在了。
第二点在awaitTermination
的文档中,其实是:
Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.
由于您没有调用 shutDown
并且该池创建了 非守护进程 线程,因此进程不会退出。
我正在尝试使用我自己的 ExecutorService
创建一组 CompletableFuture
来链接多个流程步骤。这些步骤可能会引发异常。
当他们这样做时,在我看来 ExecutorService
中的线程并未释放,尽管我正在尝试处理这种情况。
class Scratch {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
AtomicInteger counter = new AtomicInteger();
Supplier<?> throwingException = () -> { throw new RuntimeException("throw " + counter.incrementAndGet()); };
Function<String, CompletableFuture<?>> process =
url -> CompletableFuture.supplyAsync(throwingException, executor)
.exceptionally(Scratch::log);
var collect = IntStream.range(1, 10).mapToObj(i -> "url" + i)
.map(process)
.toArray(CompletableFuture[]::new);
final CompletableFuture<Void> together = CompletableFuture.allOf(collect);
System.out.println("joining");
together.exceptionally(Scratch::log).join();
System.out.println("finished");
if (executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("exiting cleanly");
} else {
System.out.println("not terminated");
}
executor.submit(() -> System.out.println("still executing"));
}
static <T> T log(Throwable t) {
System.out.println(t.getMessage());
return null;
}
}
输出是
java.lang.RuntimeException: throw 1
joining
java.lang.RuntimeException: throw 2
java.lang.RuntimeException: throw 3
java.lang.RuntimeException: throw 4
java.lang.RuntimeException: throw 5
java.lang.RuntimeException: throw 6
java.lang.RuntimeException: throw 7
java.lang.RuntimeException: throw 8
java.lang.RuntimeException: throw 9
finished
not terminated
由此启动的进程也没有终止(这是我注意到的方式)。
在我看来,这应该意味着此时 ExecutorService
中没有剩余线程,但事实并非如此;如果我们降低线程池容量,它仍然会 运行 所有提交的任务,如果我们在失败终止后添加另一个提交(例如 executor.submit(() -> System.out.println("still executing"));
),它将被执行。
如果我们不将自己的 ExecutorService
传递给 CompletableFutre::supplyAsync
,进程将按预期终止。
我也尝试了其他版本的异常状态处理(比如使用 together.whenComplete()
),但结果相同。
为什么会发生这种情况,如何确保 ExecutorService
正确终止?
编辑:我意识到这不是导致问题的异常,这将发生在使用您自己的执行程序服务提供给 CompletableFuture
的任何任务中,考虑到 Eugene 的回复,这完全有意义。我正在更改问题标题。
这里发生了两件事。第一个是当你执行 而没有 显式 Executor
时,你的操作 运行 在 ForkJoinPool
中。该池使用守护线程,它不会停止 VM 退出。所以当你的 main
结束时,VM 就存在了。
第二点在awaitTermination
的文档中,其实是:
Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.
由于您没有调用 shutDown
并且该池创建了 非守护进程 线程,因此进程不会退出。