为什么我的 CompletableFuture 代码 运行 在 Java 8 而不是在 Java 11?

Why does my CompletableFuture code run in Java 8 but not in Java 11?

为什么此代码块在 Java 8 和 Java 11 中的行为不同?

private static String test2() {
    CompletableFuture
            .runAsync(() -> IntStream.rangeClosed(1, 20).forEach(x -> {
                try {
                    Thread.sleep(500);
                    System.out.println(x);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));

    return "Finish";
}

我希望它打印完成,然后以 500 毫秒的间隔打印从 1 到 20 的数字,然后停止执行,它在 Java 8.

中正常工作

当我 运行 在 Java 11 上使用完全相同的方法时,它打印了 Finish 并终止而没有调用 运行Async(...) 代码。我设法通过添加这样的 ExecutorService 来启动它

private static String test2() {

    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    CompletableFuture
            .runAsync(() -> IntStream.rangeClosed(1, 10).forEach(x -> {
                try {
                    Thread.sleep(500);
                    System.out.println(x);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }), executorService);

    return "Finish";
}

现在它被执行了,但没有完成;它达到 10,之后只是坐着没有完成。我想出了如何通过在 return 之前调用 executorService.shutdown(); 来停止执行,但我 100% 确定这种方法是错误的,因为通常我会为许多方法使用相同的 executorService,如果我关闭它下来其他方法也会执行失败。

这里 Java 8 和 Java 11 之间发生了什么变化,为什么我现在必须添加一个显式执行程序服务,最重要的是我怎样才能正确地完成方法执行?

如果必须 CompletableFuture

private static String test2() {
  EventQueue.invokeLater(new Runnable() {
    @Override
    public void run() {
      try {
        CompletableFuture.runAsync( () -> IntStream.rangeClosed(1, 20).forEach(x -> {
          try {
            Thread.sleep(500);
          }
          catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.err.println(x);
        })).get();
      }
      catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
      }
    }});
    return "Finish";
}

我想你最好选择 CompletableFuture

private static String test2() {
    Runnable runner = new Runnable() {
      @Override
      public void run() {
          IntStream.rangeClosed(1, 20).forEach(x -> {
            try {
              Thread.sleep(500);
            }
            catch (InterruptedException e) {
              e.printStackTrace();
            }
            System.out.println(x);
          });
      }
  };
  Executors.newCachedThreadPool().execute(runner);
  return "Finish";
}

我会说您的程序的行为符合预期。 runAsync 将 运行 公共分叉连接池中提供的操作,除非您提供执行程序。 在任何情况下,如果您不等待可完成的未来完成,方法 test2 会立即打印“完成”和 returns.

“Finish”可以随时打印:您可能会看到“Finish, 1, ...”或“1, Finish, 2...”等...这是一个竞争条件。

当你不使用执行器时,因为公共池中的线程是守护线程,你的程序可能随时退出,不会等待预定的操作完成。

当您使用带有非守护线程的执行程序(通常是默认设置)时,程序不会退出,直到执行程序关闭。

确保您的程序在您的操作完成之前不退出的唯一方法是等待可完成的未来完成,按照其他答案中的建议调用 get or join

TL;DR - 在调用 CompletableFuture.runAsync 之后和代码末尾添加 ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.SECONDS); 这样 System.exit 就不会停止运行。这样你就会得到你的行为。


更长的答案:

好的,首先,我在 Oracles java 8、OpenJDK 8 和 OpenJDK 11 中尝试了这两个示例。整体行为一致,所以我的回答是 没有 在不同 java 版本的这些实现中发生了变化,这会导致这种差异。在两个示例中,您看到的行为与Java告诉您的行为一致。

来自 CompletableFuture.runAsync

的文档

Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() after it runs the given action.

好的...让我们看看 ForkJoinPool.commonPool 会告诉我们什么(强调我的):

Returns the common pool instance. This pool is statically constructed; its run state is unaffected by attempts to shutdown() or shutdownNow(). However this pool and any ongoing processing are automatically terminated upon program System.exit(int). Any program that relies on asynchronous task processing to complete before program termination should invoke commonPool().awaitQuiescence, before exit.

啊哈,这就是为什么我们在使用公共池时看不到倒计时的原因,这是因为公共池将在系统退出时终止,这正是我们从方法 return 时发生的情况并退出程序(假设你的例子真的像你展示的那么简单......就像在 main 中调用一个方法......无论如何)

那么自定义执行器为什么会起作用呢?因为,正如您已经注意到的那样,该执行者尚未终止。后台还有一段代码运行,虽然闲着,那Java也没有停手的力量


那么我们现在可以做什么?

一个 选项是执行我们自己的执行程序并在完成后将其关闭,就像您建议的那样。我认为这种方法 并没有 使用起来那么糟糕。

第二个选项是遵循java文档所说的内容。

Any program that relies on asynchronous task processing to complete before program termination should invoke commonPool().awaitQuiescence, before exit.

public boolean awaitQuiescence​(long timeout, TimeUnit unit)

If called by a ForkJoinTask operating in this pool, equivalent in effect to ForkJoinTask.helpQuiesce(). Otherwise, waits and/or attempts to assist performing tasks until this pool isQuiescent() or the indicated timeout elapses.

因此我们可以调用该方法并为公共池中的所有公共进程指定超时。我的意见是,这有点 业务特定,因为现在你必须回答这个问题 - 现在超时到底应该是什么??

第三个选项是使用CompletableFutures的力量并将这个runAsync方法提升到一个变量:

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> ...
...
...
bla bla bla code bla bla bla
...
...
voidCompletableFuture.join();
// or if you want to handle exceptions, use get
voidCompletableFuture.get();

然后在你需要的时候,你 join()/get() 任何你需要的东西作为 return 值。我最喜欢这个,因为代码像这样最干净易懂。我也可以 链接 我想要的所有 CF,并用它们做一些时髦的事情。


的情况下,您不需要 return 值,也不需要做任何其他事情,只需要一个简单的 return一个字符串和从 1 到 20 计数的异步处理,然后只需将 ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.SECONDS); 推到你方便的地方并给它一些荒谬的超时,因此 保证 你将退出所有空闲 个进程。