将 CompletableStage 动态地与最终的错误传播结合起来

Combine CompletableStage dynamically with eventually error propagation

我需要动态组合 CompletionStages(基于计算结果)以便不阻塞执行,最终我需要捕获操作期间可能出现的异常,以便小心地关闭执行。

我已经实现了如下内容:


public CompletableFuture<Data> getData() {

    final Data accumulator = new Data();

    CompletableFuture<Data> result = new CompletableFuture<>();

    CompletableStage exec = ... //starting execution

    exec.thenComposeAsync(
                    (res) -> process(accumulator, res)
            ).thenAccept(t -> result.complete(accumulator));
    return result;
  }

  private CompletionStage<Void> process(Data acc, Result res) {

    res.data().forEach(
            currData -> {
              add.addData(currData);
            }
    );
    if (res.hasMoreData()) {
      return res.fetchNextData().thenComposeAsync(
              (nextData) -> process(acc, nextData)
      );
    }

    return CompletableFuture.completedFuture(null);

  }

我不知道这是否是实施该解决方案的最佳方式,但如果一切正常,它会起作用。当 forEach 块由于任何原因出现异常时,问题就来了,错误不会传播回 getData 调用者所以我无法用 exceptionally 方法捕获它以停止我的以安全的方式应用。我想我做错了什么。

当传递给thenComposeAsync的函数异常失败时,thenComposeAsync返回的future会异常完成。这会导致由链式普通操作创建的期货也异常完成,而无需评估它们的功能。

该规则有三种例外情况,exceptionally, which is only evaluated after an exceptional completion, to produce a replacement value, whereas handle and whenComplete 在任何一种情况下都会进行评估。

所以当你想用回退值替换异常时,你可以使用

exec.thenComposeAsync(res -> process(accumulator, res))
    .exceptionally(throwable -> fallBack)
    .thenAccept(t -> result.complete(accumulator));

必须注意在 thenAccept 之前链接 exceptionally,否则在特殊情况下传递给 thenAccept 的函数将不会被计算。

当你想将异常传播到 result 未来时,你可以使用

exec.thenComposeAsync(res -> process(accumulator, res))
    .whenComplete((value, throwable) -> {
         if(throwable == null) result.complete(accumulator);
         else result.completeExceptionally(throwable);
    });

检查 throwablenull 以确定完成是否异常是至关重要的,因为 value 可以是 null 作为普通结果。即使在普通结果值永远不可能是 null 的场景中,也建议坚持惯用的解决方案,因为您不知道是否以及何时会重用代码。