WorkStealingPool 和 ThreadPoolExecutor 在与 CompletableFuture 一起使用时会产生不同的结果

WorkStealingPool and ThreadPoolExecutor yields different results when used with CompletableFuture

执行以下代码时:

ExecutorService executorService = Executors.newWorkStealingPool(20);

Function<String, CompletableFuture<String>> requestTask =
        url -> CompletableFuture.supplyAsync(() -> {
                    System.out.println("Request " + requestCount++ + " was sent");
                    HttpClient.get(url);
                    return url;
                    }, executorService);

Function<String, String> extractName = s -> s.replaceAll("(https|http|://|\.com|www\.|\.io)", "");

CompletableFuture[] futures = urls.stream() // urls list contains 14 urls
        .map(requestTask)
        .map(future -> future.thenApply(extractName))
        .map(future -> future.thenAccept(System.out::println))
        .toArray(CompletableFuture[]::new);

CompletableFuture.allOf(futures);
executorService.shutdown();

结果如下:

Request 0 was sent
Request 1 was sent
Request 2 was sent
Request 3 was sent
Request 4 was sent
Request 5 was sent
Process finished with exit code 0

但是,当 Executors.newWorkStealingPool(20) 替换为 Executors.newFixedThreadPool(20) 时,所有请求都会被发送。这种行为的原因是什么?

并非所有请求都已发送,因为(剧透警报!)JVM 只是终止。

如您所知,the conditions for JVM termination 是:

either of the following occurs:

  • The exit method of class Runtime has been called and the security manager has permitted the exit operation to take place.
  • All threads that are not daemon threads have died, either by returning from the call to the run method or by throwing an exception that propagates beyond the run method.

显然不是第一种,那肯定是第二种

首先要注意的是,您让 main() 方法退出:

  • 你调用 CompletableFuture.allOf() 但你没有对结果做任何事情所以它不会阻塞(没有 join() 调用);
  • 调用 ExecutorService.shutDown() 只会告诉执行程序关闭,它不会等待它。

最初,主线程是唯一的非守护线程,所以这应该足以让 JVM 退出。但这就是 2 位执行者发挥作用的地方:

  • newFixedThreadPool()ThreadPoolExecutor 实现,它使用 Executors.defaultThreadFactory(),创建非守护线程;
  • newWorkStealingPool()ForkJoinPool 实现,它在它创建的所有线程上调用 setDaemon(true)

不幸的是,它没有记录在案,但基本上,这可以归结为

您的问题有两种可能的解决方案:

  • allOf()
  • 之后调用 join()
  • shutdown()
  • 之后在执行器上调用 awaitTermination()

¹ , this was not documented in Java 8, but it is now since Java 9.