Java CompletionStage 不会异常传播异常

Java CompletionStage not propagating exception to exceptionally

我在我的一个项目中使用异步编程。在 return 类型为 CompletionStage 的方法之一中,为了测试,我模拟了一个异常抛出,该异常抛出不会被 exceptionally() 捕获。下面是我根据用例创建的一些测试代码。

public class TestMain {
  
  TestService service = new TestService();
  
  public static void main(String[] args) throws InterruptedException {
    TestMain t = new TestMain();
    System.out.println("STARTED: THREAD " + Thread.currentThread().getName());
    t.execute("test").toCompletableFuture().join();
    System.out.println("DONE: THREAD " + Thread.currentThread().getName());
  }
  
  public CompletionStage<Void> execute(String s) {
    System.out.println("EXECUTE: THREAD " + Thread.currentThread().getName());
    return CompletableFuture.runAsync(() -> {
      System.out.println("EXECUTE-ASYNC STARTED: THREAD " + Thread.currentThread().getName());
      process(s);}).exceptionally(ex->{
        System.out.println("EXECUTE-ASYNC EXCEPTIONAL: THREAD " + Thread.currentThread().getName());
        System.err.println("EXECUTE-ASYNC EXCEPTIONAL-EXCEPTION " + ex.toString());
        if (ex.getCause()!=null)
          System.err.println("EXECUTE-ASYNC EXCEPTIONAL-EXCEPTION CAUSE " + ex.getCause().toString());
        throw ex instanceof CompletionException ? (CompletionException) ex : new CompletionException(ex);
      });
  }
  
  private CompletionStage<Void> process(String s) {
    System.out.println("EXECUTE-PROCESS: THREAD " + Thread.currentThread().getName());
//    throw new RuntimeException();
    return service.service(s);
  }

}

public class TestService {
  
  public CompletionStage<Void> service(String s) {
    System.out.println("SERVICE: THREAD " + Thread.currentThread().getName());
//    throw new RuntimeException();
    return preProcess(s).thenCompose(v -> {
      System.out.println("SERVICE-PRE PROCESS -> COMPOSE: THREAD " + Thread.currentThread().getName());
      return process(s);
    });
  }
  
  private CompletionStage<Void> preProcess(String s) {
    System.out.println("SERVICE-PRE PROCESS: THREAD " + Thread.currentThread().getName());
//    throw new RuntimeException();
    return CompletableFuture.completedFuture(null);
  }

  private CompletionStage<Void> process(String s) {
    System.out.println("SERVICE-PROCESS: THREAD " + Thread.currentThread().getName());
    throw new RuntimeException();
//    return CompletableFuture.completedFuture(null);
  }
  
}

当我 运行 上面给出的代码输出显示 TestMain.execute() 中的 exceptionally() 没有被调用。

实际输出(当从 TestService.process() 抛出异常时):

STARTED: THREAD main
EXECUTE: THREAD main
EXECUTE-ASYNC STARTED: THREAD ForkJoinPool.commonPool-worker-1
EXECUTE-PROCESS: THREAD ForkJoinPool.commonPool-worker-1
SERVICE: THREAD ForkJoinPool.commonPool-worker-1
SERVICE-PRE PROCESS: THREAD ForkJoinPool.commonPool-worker-1
SERVICE-PRE PROCESS -> COMPOSE: THREAD ForkJoinPool.commonPool-worker-1
SERVICE-PROCESS: THREAD ForkJoinPool.commonPool-worker-1
DONE: THREAD main

但是,如果我在注释代码标记的任何其他地方抛出异常,exceptionally() 就会被调用。下面是一个这样的例子。

实际输出(当从 TestService.preProcess() 抛出异常时): 期望上面的输出与此类似。

STARTED: THREAD main
EXECUTE: THREAD main
EXECUTE-ASYNC STARTED: THREAD ForkJoinPool.commonPool-worker-1
EXECUTE-PROCESS: THREAD ForkJoinPool.commonPool-worker-1
SERVICE: THREAD ForkJoinPool.commonPool-worker-1
SERVICE-PRE PROCESS: THREAD ForkJoinPool.commonPool-worker-1
EXECUTE-ASYNC EXCEPTIONAL: THREAD main
EXECUTE-ASYNC EXCEPTIONAL-EXCEPTION java.util.concurrent.CompletionException: java.lang.RuntimeException
EXECUTE-ASYNC EXCEPTIONAL-EXCEPTION CAUSE java.lang.RuntimeException
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1643)
    at java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1632)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.RuntimeException
    at in.TestService.preProcess(TestService.java:19)
    at in.TestService.service(TestService.java:11)
    at in.TestMain.process(TestMain.java:34)
    at in.TestMain.lambda[=13=](TestMain.java:22)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    ... 5 more

有人可以帮忙解释为什么会这样吗?我需要在 TestMain.execute().exceptionally() 中捕获抛出的异常,以始终实现我想在此处使用的重试逻辑。

提前致谢。

您正在调用一个 returns 一个 CompletionStage 但忽略返回值的方法:

CompletableFuture.runAsync(() -> {
  System.out.println("EXECUTE-ASYNC STARTED: THREAD "+Thread.currentThread().getName());
  process(s); // return value ignored
})

如果函数/Runnable抛出异常,由CompletableFuture.runAsync(…)创建的阶段将异常完成。当您将 throw new RuntimeException(); 放在 process 方法中时会发生什么。但在另一种情况下,它不会抛出异常,而是 returns 一个被调用者忽略的失败阶段。

您可以将代码更改为

CompletableFuture.runAsync(() -> {
  System.out.println("EXECUTE-ASYNC STARTED: THREAD "+Thread.currentThread().getName());
  process(s).toCompletableFuture().join();
})

不要忽略返回的阶段,或者最好使用您已经在其他地方使用过的模式:

CompletableFuture.completedFuture(null).thenComposeAsync(v -> {
  System.out.println("EXECUTE-ASYNC STARTED: THREAD "+Thread.currentThread().getName());
  return process(s);
})