在 Java 中嵌套 CompletionStages 以使内部块 运行 在外部块之前

Nesting CompletionStages in Java to make inner blocks run before outer blocks

我写了如下方法:

 public static CompletionStage<Tuple2<ObjectNode, String>> calculateTemplateTreeAndKeys(
  String content,
  RequestContext context,
  MetricsClient metricsClient,
  JdbcSession jdbcSession) {

AtomicReference<ObjectNode> templateTreeHolder = new AtomicReference<>();
templateTreeHolder.set(Json.rootNode());

return getTemplateIds(context, metricsClient, jdbcSession, content)
    .thenCompose(
        templateIds -> {
          templateIds.map(
              id ->
                  // do something and return CompletionStage<String>
                      .thenAccept(
                          tree -> {
                            templateTreeHolder.set(
                                (ObjectNode)
                                    templateTreeHolder.get().set(id, Json.readTree(tree)));

                            System.out.println(
                                "From inner function: " + templateTreeHolder.get());
                          }));
          return CompletableFuture.completedFuture(NotUsed.getInstance());
        })
    .thenApply(
        notUsed -> {
          String includedTemplateIdsStr =
              getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();

          System.out.println("From outer function: " + templateTreeHolder.get());

          return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
        });

我希望内部块在之前处理和更新 templateTreeHolder .thenApply 被调用,因此 templateTreeHolder 会将正确的数据保存到 return。但是,.thenApply 块在内部 .thenAccept 块之前处理。

来自控制台的输出序列:

From outer function: {}
From inner function: {"f9406341-c62a-411a-9389-00a62bd63629":{}}

我不确定我在链接 CompletionStages 时做错了什么,请告诉我如何确保内部块在外部块之前完成?

您传递给 thenCompose 的函数是 return 已经完成的未来,即 return CompletableFuture.completedFuture(NotUsed.getInstance()); 允许从属阶段立即进行。这似乎与传递给 templateIds.map(…) 的函数的求值冲突,后者显然是异步发生的。

通常,您应该避免混合完成阶段和对副作用的依赖,尤其是当它们的异步评估未建模为先决条件完成阶段时。

但是如果你别无选择,你可以解决这个问题:

return getTemplateIds(context, metricsClient, jdbcSession, content)
    .thenCompose(
        templateIds -> {
          // create an initially uncompleted stage
          CompletableFuture<Object> subStage = new CompletableFuture<>(); 
          templateIds.map(
              id ->
                  // do something and return CompletionStage<String>
                      .thenAccept(
                          tree -> {
                            templateTreeHolder.set(
                                (ObjectNode)
                                    templateTreeHolder.get().set(id, Json.readTree(tree)));

                            System.out.println(
                                "From inner function: " + templateTreeHolder.get());
                            // complete when all work has been done
                            subStage.complete(null);
                          }));
          // use this stage for dependent actions
          return subStage;
        })
    .thenApply(
        notUsed -> {
          String includedTemplateIdsStr =
              getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();

          System.out.println("From outer function: " + templateTreeHolder.get());

          return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
        });

在上面的代码中,如果您的操作在完成尝试之前因异常而失败,则 Future 将永远不会完成。一般模式是这样的:

CompletableFuture<Type> stage = new CompletableFuture<>();
…
try {
    code that will eventually call complete on stage
}
catch(Throwable t) {
    stage.completeExceptionally(t);
}

但是,当然,当应该完成阶段的代码也承担异步处理时,它会变得更复杂一些,所以你必须保护试图提交实际完成代码的代码,以及实际完成代码。

因此,更详细的内部代码版本如下所示:

CompletableFuture<Object> subStage = new CompletableFuture<>();
try {
    templateIds.map(
        id ->
            // do something and return CompletionStage<String>
            .thenAccept(
                tree -> {
                  templateTreeHolder.set(
                      (ObjectNode)
                          templateTreeHolder.get().set(id, Json.readTree(tree)));

                  System.out.println(
                      "From inner function: " + templateTreeHolder.get());
                })
            .whenComplete((v,t) -> {
                // complete when all work has been done
                if(t != null) subStage.completeExceptionally(t);
                else subStage.complete(v);
            }));
} catch(Throwable t) {
    subStage.completeExceptionally(t);
}
// use this stage for dependent actions
return subStage;

(也许,“做某事和 return CompletionStage” 也必须用 try { … } catch(Throwable t) { subStage.completeExceptionally(t); } 保护)