如何在 allOf() 超时后安全地取消所有未决期货

How to safely cancel all pending futures after timing out on allOf()

我正在尝试重构顺序等待多个 futures 完成的代码,而不是共同等待完成。

所以我尝试使用

等待多个 futures 一次超时
// Example outcomes
final CompletableFuture<String> completedFuture
    = CompletableFuture.completedFuture("hello");
final CompletableFuture<String> failedFuture
    = new CompletableFuture<>();
failedFuture.completeExceptionally(new RuntimeException("Test Stub Exception"));
final CompletableFuture<String> incompleteFuture
    = new CompletableFuture<>();

final AtomicBoolean timeoutHandled = new AtomicBoolean(false);
final CompletableFuture<String> checkedFuture
    = incompleteFuture.whenComplete(
         (x, e) -> timeoutHandled.set(e instanceof TimeoutException));

// this example timeouts after 1ms
try {
    CompletableFuture
        .allOf(completedFuture, checkedFuture, failedFuture)
        .get(1, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
    Thread.currentThread().interrupt();
} catch (final TimeoutException e) {
    // probably do something here?
}

// but the incomplete future is still pending
assertTrue(checkedFuture.isCompletedExceptionally());
// this still fails even if checkedFuture.completeExceptionally(e) is called
assertTrue(timeoutHandled.get());

然而上面的断言失败了,因为当集体未来超时时,个人未来还没有超时。我想取消这样的个人期货,就像他们单独 运行 超时一样,因为他们可能有单独的 whenComplete() 处理程序处理 TimeoutExceptions:

Expecting
  <CompletableFuture[Incomplete]>
to be completed exceptionally.

是否有一个 useful/safe 模式,我可以通过它遍历所有异常并调用 completeExceptionally() 来模拟每个未来的超时,并确保在移动之前调用了所有“异常处理程序”在?

您可以使用 try/catch 创建可变参数方法,循环遍历每个 CompletableFuture 并调用 completeExceptionally()。

static void completeFutures(CompletableFuture<?>... completableFutures) throws ExecutionException {
        try {
            CompletableFuture.allOf(completableFutures).get(1, TimeUnit.MILLISECONDS);
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (final TimeoutException e) {
            for (CompletableFuture<?> cf : completableFutures) {
                cf.completeExceptionally(e);
            }
        }
    }