在 Java 中嵌套 CompletionStages 以使内部块 运行 在外部块之前
Nesting CompletionStages in Java to make inner blocks run before outer blocks
我写了如下方法:
public static CompletionStage<Tuple2<ObjectNode, String>> calculateTemplateTreeAndKeys(
String content,
RequestContext context,
MetricsClient metricsClient,
JdbcSession jdbcSession) {
AtomicReference<ObjectNode> templateTreeHolder = new AtomicReference<>();
templateTreeHolder.set(Json.rootNode());
return getTemplateIds(context, metricsClient, jdbcSession, content)
.thenCompose(
templateIds -> {
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
}));
return CompletableFuture.completedFuture(NotUsed.getInstance());
})
.thenApply(
notUsed -> {
String includedTemplateIdsStr =
getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();
System.out.println("From outer function: " + templateTreeHolder.get());
return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
});
我希望内部块在之前处理和更新 templateTreeHolder
.thenApply 被调用,因此 templateTreeHolder 会将正确的数据保存到 return。但是,.thenApply 块在内部 .thenAccept 块之前处理。
来自控制台的输出序列:
From outer function: {}
From inner function: {"f9406341-c62a-411a-9389-00a62bd63629":{}}
我不确定我在链接 CompletionStages 时做错了什么,请告诉我如何确保内部块在外部块之前完成?
您传递给 thenCompose
的函数是 return 已经完成的未来,即 return CompletableFuture.completedFuture(NotUsed.getInstance());
允许从属阶段立即进行。这似乎与传递给 templateIds.map(…)
的函数的求值冲突,后者显然是异步发生的。
通常,您应该避免混合完成阶段和对副作用的依赖,尤其是当它们的异步评估未建模为先决条件完成阶段时。
但是如果你别无选择,你可以解决这个问题:
return getTemplateIds(context, metricsClient, jdbcSession, content)
.thenCompose(
templateIds -> {
// create an initially uncompleted stage
CompletableFuture<Object> subStage = new CompletableFuture<>();
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
// complete when all work has been done
subStage.complete(null);
}));
// use this stage for dependent actions
return subStage;
})
.thenApply(
notUsed -> {
String includedTemplateIdsStr =
getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();
System.out.println("From outer function: " + templateTreeHolder.get());
return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
});
在上面的代码中,如果您的操作在完成尝试之前因异常而失败,则 Future 将永远不会完成。一般模式是这样的:
CompletableFuture<Type> stage = new CompletableFuture<>();
…
try {
code that will eventually call complete on stage
}
catch(Throwable t) {
stage.completeExceptionally(t);
}
但是,当然,当应该完成阶段的代码也承担异步处理时,它会变得更复杂一些,所以你必须保护试图提交实际完成代码的代码,以及实际完成代码。
因此,更详细的内部代码版本如下所示:
CompletableFuture<Object> subStage = new CompletableFuture<>();
try {
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
})
.whenComplete((v,t) -> {
// complete when all work has been done
if(t != null) subStage.completeExceptionally(t);
else subStage.complete(v);
}));
} catch(Throwable t) {
subStage.completeExceptionally(t);
}
// use this stage for dependent actions
return subStage;
(也许,“做某事和 return CompletionStage” 也必须用 try { … } catch(Throwable t) { subStage.completeExceptionally(t); }
保护)
我写了如下方法:
public static CompletionStage<Tuple2<ObjectNode, String>> calculateTemplateTreeAndKeys(
String content,
RequestContext context,
MetricsClient metricsClient,
JdbcSession jdbcSession) {
AtomicReference<ObjectNode> templateTreeHolder = new AtomicReference<>();
templateTreeHolder.set(Json.rootNode());
return getTemplateIds(context, metricsClient, jdbcSession, content)
.thenCompose(
templateIds -> {
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
}));
return CompletableFuture.completedFuture(NotUsed.getInstance());
})
.thenApply(
notUsed -> {
String includedTemplateIdsStr =
getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();
System.out.println("From outer function: " + templateTreeHolder.get());
return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
});
我希望内部块在之前处理和更新 templateTreeHolder .thenApply 被调用,因此 templateTreeHolder 会将正确的数据保存到 return。但是,.thenApply 块在内部 .thenAccept 块之前处理。
来自控制台的输出序列:
From outer function: {}
From inner function: {"f9406341-c62a-411a-9389-00a62bd63629":{}}
我不确定我在链接 CompletionStages 时做错了什么,请告诉我如何确保内部块在外部块之前完成?
您传递给 thenCompose
的函数是 return 已经完成的未来,即 return CompletableFuture.completedFuture(NotUsed.getInstance());
允许从属阶段立即进行。这似乎与传递给 templateIds.map(…)
的函数的求值冲突,后者显然是异步发生的。
通常,您应该避免混合完成阶段和对副作用的依赖,尤其是当它们的异步评估未建模为先决条件完成阶段时。
但是如果你别无选择,你可以解决这个问题:
return getTemplateIds(context, metricsClient, jdbcSession, content)
.thenCompose(
templateIds -> {
// create an initially uncompleted stage
CompletableFuture<Object> subStage = new CompletableFuture<>();
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
// complete when all work has been done
subStage.complete(null);
}));
// use this stage for dependent actions
return subStage;
})
.thenApply(
notUsed -> {
String includedTemplateIdsStr =
getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();
System.out.println("From outer function: " + templateTreeHolder.get());
return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
});
在上面的代码中,如果您的操作在完成尝试之前因异常而失败,则 Future 将永远不会完成。一般模式是这样的:
CompletableFuture<Type> stage = new CompletableFuture<>();
…
try {
code that will eventually call complete on stage
}
catch(Throwable t) {
stage.completeExceptionally(t);
}
但是,当然,当应该完成阶段的代码也承担异步处理时,它会变得更复杂一些,所以你必须保护试图提交实际完成代码的代码,以及实际完成代码。
因此,更详细的内部代码版本如下所示:
CompletableFuture<Object> subStage = new CompletableFuture<>();
try {
templateIds.map(
id ->
// do something and return CompletionStage<String>
.thenAccept(
tree -> {
templateTreeHolder.set(
(ObjectNode)
templateTreeHolder.get().set(id, Json.readTree(tree)));
System.out.println(
"From inner function: " + templateTreeHolder.get());
})
.whenComplete((v,t) -> {
// complete when all work has been done
if(t != null) subStage.completeExceptionally(t);
else subStage.complete(v);
}));
} catch(Throwable t) {
subStage.completeExceptionally(t);
}
// use this stage for dependent actions
return subStage;
(也许,“做某事和 return CompletionStage” 也必须用 try { … } catch(Throwable t) { subStage.completeExceptionally(t); }
保护)