将 CompletableStage 动态地与最终的错误传播结合起来
Combine CompletableStage dynamically with eventually error propagation
我需要动态组合 CompletionStages
(基于计算结果)以便不阻塞执行,最终我需要捕获操作期间可能出现的异常,以便小心地关闭执行。
我已经实现了如下内容:
public CompletableFuture<Data> getData() {
final Data accumulator = new Data();
CompletableFuture<Data> result = new CompletableFuture<>();
CompletableStage exec = ... //starting execution
exec.thenComposeAsync(
(res) -> process(accumulator, res)
).thenAccept(t -> result.complete(accumulator));
return result;
}
private CompletionStage<Void> process(Data acc, Result res) {
res.data().forEach(
currData -> {
add.addData(currData);
}
);
if (res.hasMoreData()) {
return res.fetchNextData().thenComposeAsync(
(nextData) -> process(acc, nextData)
);
}
return CompletableFuture.completedFuture(null);
}
我不知道这是否是实施该解决方案的最佳方式,但如果一切正常,它会起作用。当 forEach
块由于任何原因出现异常时,问题就来了,错误不会传播回 getData
调用者所以我无法用 exceptionally
方法捕获它以停止我的以安全的方式应用。我想我做错了什么。
当传递给thenComposeAsync
的函数异常失败时,thenComposeAsync
返回的future会异常完成。这会导致由链式普通操作创建的期货也异常完成,而无需评估它们的功能。
该规则有三种例外情况,exceptionally
, which is only evaluated after an exceptional completion, to produce a replacement value, whereas handle
and whenComplete
在任何一种情况下都会进行评估。
所以当你想用回退值替换异常时,你可以使用
exec.thenComposeAsync(res -> process(accumulator, res))
.exceptionally(throwable -> fallBack)
.thenAccept(t -> result.complete(accumulator));
必须注意在 thenAccept
之前链接 exceptionally
,否则在特殊情况下传递给 thenAccept
的函数将不会被计算。
当你想将异常传播到 result
未来时,你可以使用
exec.thenComposeAsync(res -> process(accumulator, res))
.whenComplete((value, throwable) -> {
if(throwable == null) result.complete(accumulator);
else result.completeExceptionally(throwable);
});
检查 throwable
与 null
以确定完成是否异常是至关重要的,因为 value
可以是 null
作为普通结果。即使在普通结果值永远不可能是 null
的场景中,也建议坚持惯用的解决方案,因为您不知道是否以及何时会重用代码。
我需要动态组合 CompletionStages
(基于计算结果)以便不阻塞执行,最终我需要捕获操作期间可能出现的异常,以便小心地关闭执行。
我已经实现了如下内容:
public CompletableFuture<Data> getData() {
final Data accumulator = new Data();
CompletableFuture<Data> result = new CompletableFuture<>();
CompletableStage exec = ... //starting execution
exec.thenComposeAsync(
(res) -> process(accumulator, res)
).thenAccept(t -> result.complete(accumulator));
return result;
}
private CompletionStage<Void> process(Data acc, Result res) {
res.data().forEach(
currData -> {
add.addData(currData);
}
);
if (res.hasMoreData()) {
return res.fetchNextData().thenComposeAsync(
(nextData) -> process(acc, nextData)
);
}
return CompletableFuture.completedFuture(null);
}
我不知道这是否是实施该解决方案的最佳方式,但如果一切正常,它会起作用。当 forEach
块由于任何原因出现异常时,问题就来了,错误不会传播回 getData
调用者所以我无法用 exceptionally
方法捕获它以停止我的以安全的方式应用。我想我做错了什么。
当传递给thenComposeAsync
的函数异常失败时,thenComposeAsync
返回的future会异常完成。这会导致由链式普通操作创建的期货也异常完成,而无需评估它们的功能。
该规则有三种例外情况,exceptionally
, which is only evaluated after an exceptional completion, to produce a replacement value, whereas handle
and whenComplete
在任何一种情况下都会进行评估。
所以当你想用回退值替换异常时,你可以使用
exec.thenComposeAsync(res -> process(accumulator, res))
.exceptionally(throwable -> fallBack)
.thenAccept(t -> result.complete(accumulator));
必须注意在 thenAccept
之前链接 exceptionally
,否则在特殊情况下传递给 thenAccept
的函数将不会被计算。
当你想将异常传播到 result
未来时,你可以使用
exec.thenComposeAsync(res -> process(accumulator, res))
.whenComplete((value, throwable) -> {
if(throwable == null) result.complete(accumulator);
else result.completeExceptionally(throwable);
});
检查 throwable
与 null
以确定完成是否异常是至关重要的,因为 value
可以是 null
作为普通结果。即使在普通结果值永远不可能是 null
的场景中,也建议坚持惯用的解决方案,因为您不知道是否以及何时会重用代码。