尝试等待 CompletableFuture
Try waiting for a CompletableFuture
有没有一种方法可以尝试等待 CompletableFuture
一定时间后再给出不同的结果 而不会在超时后取消 未来?
我有一个服务(我们称它为 expensiveService
),它会自行运行以完成自己的工作。 return结果是:
enum Result {
COMPLETED,
PROCESSING,
FAILED
}
我愿意[阻止并]等待它一小段时间(比如 2 秒)。如果它没有完成,我想 return 一个不同的结果,但我希望服务继续做它自己的事情。然后询问服务是否完成(例如通过 websockets 或其他)将是客户的工作。
即我们有以下案例:
expensiveService.processAndGet()
花费 1 秒完成它的未来。它 returns COMPLETED
.
expensiveService.processAndGet()
在 1 秒后失败。它 returns FAILED
.
expensiveService.processAndGet()
花费 5 秒完成它的未来。它 returns PROCESSING
。如果我们向另一个服务请求结果,我们会得到 COMPLETED
.
expensiveService.processAndGet()
5 秒后失败。它 returns PROCESSING
。如果我们向另一个服务请求结果,我们会得到 FAILED
.
在这种特定情况下,我们实际上无论如何都需要在超时时获取当前结果对象,从而导致以下额外的边缘情况。这会导致以下建议的解决方案出现一些问题:
expensiveService.processAndGet()
花费 2.01 秒完成它的未来。它 return 是 PROCESSING
或 COMPLETED
。
我也在使用 Vavr,并且乐于接受使用 Vavr Future
的建议。
我们创建了三种可能的解决方案,它们各有优缺点:
#1 等待另一个未来
CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
Thread.sleep(2000);
return null;
}).map(v -> resultService.get(processId)).toCompletableFuture(),
Function.identity());
问题
- 总是调用第二个
resultService
。
- 我们占用整个线程 2 秒。
#1a 等待检查第一个 Future 的另一个 Future
CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
int attempts = 0;
int timeout = 20;
while (!f.isDone() && attempts * timeout < 2000) {
Thread.sleep(timeout);
attempts++;
}
return null;
}).map(v -> resultService.get(processId)).toCompletableFuture(),
Function.identity());
问题
- 第二个
resultService
仍然总是被调用。
- 我们需要将第一个 Future 传递给第二个,这不是很干净。
#2 Object.notify
Object monitor = new Object();
CompletableFuture<Upload> process = expensiveService.processAndGet();
synchronized (monitor) {
process.whenComplete((r, e) -> {
synchronized (monitor) {
monitor.notifyAll();
}
});
try {
int attempts = 0;
int timeout = 20;
while (!process.isDone() && attempts * timeout < 2000) {
monitor.wait(timeout);
attempts++;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (process.isDone()) {
return process.toCompletableFuture();
} else {
return CompletableFuture.completedFuture(resultService.get(processId));
}
问题
- 复杂代码(可能存在错误,可读性差)。
#3 瓦弗的 Future.await
return Future.of(() -> expensiveService.processAndGet()
.await(2, TimeUnit.SECONDS)
.recoverWith(e -> {
if (e instanceof TimeoutException) {
return Future.successful(resultService.get(processId));
} else {
return Future.failed(e);
}
})
.toCompletableFuture();
问题
- 需要一个未来中的未来以避免
await
取消内部未来。
- 将第一个 Future 移动到第二个会破坏依赖于
ThreadLocal
s 的 [legacy] 代码。
recoverWith
并抓住 TimeoutException
不是那么优雅。
#4 CompletableFuture.orTimeout
return expensiveService.processAndGet()
.orTimeout(2, TimeUnit.SECONDS)
.<CompletableFuture<Upload>>handle((u, e) -> {
if (u != null) {
return CompletableFuture.completedFuture(u);
} else if (e instanceof TimeoutException) {
return CompletableFuture.completedFuture(resultService.get(processId));
} else {
return CompletableFuture.failedFuture(e);
}
})
.thenCompose(Function.identity());
问题
- 虽然在我的例子中
processAndGet
未来没有被取消,但根据文档,它应该被取消。
- 异常处理不好。
#5 CompletableFuture.completeOnTimeout
return expensiveService.processAndGet()
.completeOnTimeout(null, 2, TimeUnit.SECONDS)
.thenApply(u -> {
if (u == null) {
return resultService.get(processId);
} else {
return u;
}
});
问题
- 虽然在我的例子中
processAndGet
未来尚未完成,但根据文档,它应该是。
- 如果
processAndGet
想要 return null
作为不同的状态怎么办?
所有这些解决方案都有缺点并且需要额外的代码,但这感觉像是 CompletableFuture
或 Vavr 的 Future
开箱即用的应该支持的东西。有更好的方法吗?
首先值得指出的是,CompletableFuture
是如何工作的(或者为什么它被这样命名):
CompletableFuture<?> f = CompletableFuture.supplyAsync(supplier, executionService);
基本等同于
CompletableFuture<?> f = new CompletableFuture<>();
executionService.execute(() -> {
if(!f.isDone()) {
try {
f.complete(supplier.get());
}
catch(Throwable t) {
f.completeExceptionally(t);
}
}
});
从 CompletableFuture
到 Executor
正在执行的代码没有任何联系,事实上,我们可以有任意数量的正在进行的完成尝试。特定代码旨在完成 CompletableFuture
实例这一事实只有在调用其中一种完成方法时才会变得明显。
因此,CompletableFuture
不能以任何方式影响 运行 操作,这包括中断取消等。正如 the documentation of CompletableFuture
所说:
Method cancel has the same effect as completeExceptionally(new CancellationException())
所以取消只是另一个完成尝试,如果它是第一个,它将获胜,但不会影响任何其他完成尝试。
所以orTimeout(long timeout, TimeUnit unit)
在这方面没有太大区别。超时过后,它将执行等同于 completeExceptionally(new TimeoutException())
,如果没有其他完成尝试更快,它将获胜,这将影响依赖阶段,但不会影响其他正在进行的完成尝试,例如expensiveService.processAndGet()
在你的案例中引发了什么。
您可以像
一样实现所需的操作
CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture<Upload> alternative = CompletableFuture.supplyAsync(
() -> resultService.get(processId), CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS));
return future.applyToEither(alternative, Function.identity())
.whenComplete((u,t) -> alternative.cancel(false));
对于 delayedExecutor
,我们使用与 orTimeout
和 completeOnTimeout
相同的工具。当 future.whenComplete
中的取消速度更快时,它不会在指定时间之前评估指定的 Supplier
或根本不评估。 applyToEither
将更快地提供可用的结果。
这并没有在超时时完成 future
,但是如前所述,它的完成无论如何都不会影响原始计算,所以这也可以工作:
CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS)
.execute(() -> {
if(!future.isDone()) future.complete(resultService.get(processId));
});
return future;
如上所述,这会在超时后完成未来,不会影响正在进行的计算,但会向调用者提供替代结果,但它不会将 resultService.get(processId)
抛出的异常传播到返回的未来。
有没有一种方法可以尝试等待 CompletableFuture
一定时间后再给出不同的结果 而不会在超时后取消 未来?
我有一个服务(我们称它为 expensiveService
),它会自行运行以完成自己的工作。 return结果是:
enum Result {
COMPLETED,
PROCESSING,
FAILED
}
我愿意[阻止并]等待它一小段时间(比如 2 秒)。如果它没有完成,我想 return 一个不同的结果,但我希望服务继续做它自己的事情。然后询问服务是否完成(例如通过 websockets 或其他)将是客户的工作。
即我们有以下案例:
expensiveService.processAndGet()
花费 1 秒完成它的未来。它 returnsCOMPLETED
.expensiveService.processAndGet()
在 1 秒后失败。它 returnsFAILED
.expensiveService.processAndGet()
花费 5 秒完成它的未来。它 returnsPROCESSING
。如果我们向另一个服务请求结果,我们会得到COMPLETED
.expensiveService.processAndGet()
5 秒后失败。它 returnsPROCESSING
。如果我们向另一个服务请求结果,我们会得到FAILED
.
在这种特定情况下,我们实际上无论如何都需要在超时时获取当前结果对象,从而导致以下额外的边缘情况。这会导致以下建议的解决方案出现一些问题:
expensiveService.processAndGet()
花费 2.01 秒完成它的未来。它 return 是PROCESSING
或COMPLETED
。
我也在使用 Vavr,并且乐于接受使用 Vavr Future
的建议。
我们创建了三种可能的解决方案,它们各有优缺点:
#1 等待另一个未来
CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
Thread.sleep(2000);
return null;
}).map(v -> resultService.get(processId)).toCompletableFuture(),
Function.identity());
问题
- 总是调用第二个
resultService
。 - 我们占用整个线程 2 秒。
#1a 等待检查第一个 Future 的另一个 Future
CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
int attempts = 0;
int timeout = 20;
while (!f.isDone() && attempts * timeout < 2000) {
Thread.sleep(timeout);
attempts++;
}
return null;
}).map(v -> resultService.get(processId)).toCompletableFuture(),
Function.identity());
问题
- 第二个
resultService
仍然总是被调用。 - 我们需要将第一个 Future 传递给第二个,这不是很干净。
#2 Object.notify
Object monitor = new Object();
CompletableFuture<Upload> process = expensiveService.processAndGet();
synchronized (monitor) {
process.whenComplete((r, e) -> {
synchronized (monitor) {
monitor.notifyAll();
}
});
try {
int attempts = 0;
int timeout = 20;
while (!process.isDone() && attempts * timeout < 2000) {
monitor.wait(timeout);
attempts++;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (process.isDone()) {
return process.toCompletableFuture();
} else {
return CompletableFuture.completedFuture(resultService.get(processId));
}
问题
- 复杂代码(可能存在错误,可读性差)。
#3 瓦弗的 Future.await
return Future.of(() -> expensiveService.processAndGet()
.await(2, TimeUnit.SECONDS)
.recoverWith(e -> {
if (e instanceof TimeoutException) {
return Future.successful(resultService.get(processId));
} else {
return Future.failed(e);
}
})
.toCompletableFuture();
问题
- 需要一个未来中的未来以避免
await
取消内部未来。 - 将第一个 Future 移动到第二个会破坏依赖于
ThreadLocal
s 的 [legacy] 代码。 recoverWith
并抓住TimeoutException
不是那么优雅。
#4 CompletableFuture.orTimeout
return expensiveService.processAndGet()
.orTimeout(2, TimeUnit.SECONDS)
.<CompletableFuture<Upload>>handle((u, e) -> {
if (u != null) {
return CompletableFuture.completedFuture(u);
} else if (e instanceof TimeoutException) {
return CompletableFuture.completedFuture(resultService.get(processId));
} else {
return CompletableFuture.failedFuture(e);
}
})
.thenCompose(Function.identity());
问题
- 虽然在我的例子中
processAndGet
未来没有被取消,但根据文档,它应该被取消。 - 异常处理不好。
#5 CompletableFuture.completeOnTimeout
return expensiveService.processAndGet()
.completeOnTimeout(null, 2, TimeUnit.SECONDS)
.thenApply(u -> {
if (u == null) {
return resultService.get(processId);
} else {
return u;
}
});
问题
- 虽然在我的例子中
processAndGet
未来尚未完成,但根据文档,它应该是。 - 如果
processAndGet
想要 returnnull
作为不同的状态怎么办?
所有这些解决方案都有缺点并且需要额外的代码,但这感觉像是 CompletableFuture
或 Vavr 的 Future
开箱即用的应该支持的东西。有更好的方法吗?
首先值得指出的是,CompletableFuture
是如何工作的(或者为什么它被这样命名):
CompletableFuture<?> f = CompletableFuture.supplyAsync(supplier, executionService);
基本等同于
CompletableFuture<?> f = new CompletableFuture<>();
executionService.execute(() -> {
if(!f.isDone()) {
try {
f.complete(supplier.get());
}
catch(Throwable t) {
f.completeExceptionally(t);
}
}
});
从 CompletableFuture
到 Executor
正在执行的代码没有任何联系,事实上,我们可以有任意数量的正在进行的完成尝试。特定代码旨在完成 CompletableFuture
实例这一事实只有在调用其中一种完成方法时才会变得明显。
因此,CompletableFuture
不能以任何方式影响 运行 操作,这包括中断取消等。正如 the documentation of CompletableFuture
所说:
Method cancel has the same effect as
completeExceptionally(new CancellationException())
所以取消只是另一个完成尝试,如果它是第一个,它将获胜,但不会影响任何其他完成尝试。
所以orTimeout(long timeout, TimeUnit unit)
在这方面没有太大区别。超时过后,它将执行等同于 completeExceptionally(new TimeoutException())
,如果没有其他完成尝试更快,它将获胜,这将影响依赖阶段,但不会影响其他正在进行的完成尝试,例如expensiveService.processAndGet()
在你的案例中引发了什么。
您可以像
一样实现所需的操作CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture<Upload> alternative = CompletableFuture.supplyAsync(
() -> resultService.get(processId), CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS));
return future.applyToEither(alternative, Function.identity())
.whenComplete((u,t) -> alternative.cancel(false));
对于 delayedExecutor
,我们使用与 orTimeout
和 completeOnTimeout
相同的工具。当 future.whenComplete
中的取消速度更快时,它不会在指定时间之前评估指定的 Supplier
或根本不评估。 applyToEither
将更快地提供可用的结果。
这并没有在超时时完成 future
,但是如前所述,它的完成无论如何都不会影响原始计算,所以这也可以工作:
CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS)
.execute(() -> {
if(!future.isDone()) future.complete(resultService.get(processId));
});
return future;
如上所述,这会在超时后完成未来,不会影响正在进行的计算,但会向调用者提供替代结果,但它不会将 resultService.get(processId)
抛出的异常传播到返回的未来。