Java - CompletableFutures - 如果有例外,我如何取消所有期货

Java - CompletableFutures - How can i cancel all futures if there are exceptions

我有一个方法(包含在下面)来 return CompletableFuture 列表的值。

该方法应该:

  1. 能够在给定时间后超时。
  2. 如果有超过 n 个例外,则能够取消所有期货。

第一点效果很好,确实在超过超时限制后就炸了。 (之后我仍然需要调用 exectuorService.shutdownNow() 到 return 给调用者)。我遇到的问题是我要完成的第二件事。

假设我有一个包含 20,000 个期货的列表,它们都会有一个例外,那么为什么让它们全部执行,如果我看到有太多的例外,那么我假设所有的都出了问题期货,我想取消它们。

此外,我希望每个未来都有一个超时,这可能需要多长时间,但这也行不通,出于下面概述的相同原因而谦逊。

似乎是因为当我调用allDoneFuture.thenApply()时,此时它等待并让所有期货完成,无论是成功还是异常。只有在所有这些都完成之后,它才会遍历每个未来并获取其结果。到那时取消有什么好处,当他们已经完成时。

如果有人能告诉我如何满足这个特定需求,我将不胜感激:“监视异常和个别超时,并基于此取消所有其他”。

谢谢。

下面是我写的方法:

/**
     * @param futures a list of completable futures
     * @param timeout how long to allow the futures to run before throwing exception
     * @param timeUnit unit of timeout
     * @param allowedExceptions how many of the futures do we tolerate exceptions,
     * NOTE: if an exception is thrown from the futures it will return null, until it reaches the allowedExceptions threshold
     * */
    public static <T> List<T> extractFromFutures(List<CompletableFuture<T>> futures, int timeout, TimeUnit timeUnit, int allowedExceptions) {
        CompletableFuture<Void> allDoneFuture = CompletableFuture
                .allOf(futures.toArray(new CompletableFuture[futures.size()]));
        try {
            AtomicInteger exceptionCount = new AtomicInteger(0);
            return allDoneFuture.thenApply(v ->//when all are done
                    futures.stream().
                            map(future -> {
                                try {
                                    //if only I could set an individual timeout
                                    return future.get(timeout, timeUnit);
                                } catch (Exception e) {
                                    future.cancel(true);
                                    int curExceptionCnt = exceptionCount.incrementAndGet();
                                    if(curExceptionCnt >= allowedExceptions){
                                        //I would've hoped that it will throw it to the calling try-catch 
                                        //and then cancel all futures, but it doesn't
                                        throw new RuntimeException(e);
                                    }
                                    else{
                                        return null;
                                    }
                                }
                            }).
                            collect(Collectors.<T>toList())
            ).get(timeout, timeUnit);
        } catch (Exception e) {
            allDoneFuture.cancel(true);
            throw new RuntimeException(e);
        }
    }

要在一定数量的异常后取消所有剩余的期货,您可以对每个异常调用 exceptionally 并增加异常计数并可能在其中取消它们。

对于单独的超时,您可以创建一个 class 来保存未来的超时,然后根据超时对它们进行排序,并调用 get 用超时减去经过的时间。

static class FutureWithTimeout<T> {
    CompletableFuture<T> f;
    long timeout;
    TimeUnit timeUnit;

    FutureWithTimeout(CompletableFuture<T> f, long timeout, TimeUnit timeUnit) {
        this.f = f;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
    }
}
public static <T> List<T> extractFromFutures(List<FutureWithTimeout<T>> futures, int allowedExceptions) {
    AtomicInteger exceptionCount = new AtomicInteger(0);
    futures.forEach(f -> f.f.exceptionally(t -> {
        if(exceptionCount.getAndIncrement() == allowedExceptions){
            futures.forEach(c -> c.f.cancel(false));
        }
        return null;
    }));
    long t = System.nanoTime();
    return futures.stream()
        .sorted(Comparator.comparingLong(f -> f.timeUnit.toNanos(f.timeout)))
        .map(f -> {
            try {
                return f.f.get(Math.max(0, f.timeUnit.toNanos(f.timeout) - (System.nanoTime() - t)), 
                    TimeUnit.NANOSECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException ex) {
                f.f.cancel(false);
                return null;
            }
        })
        .collect(Collectors.toList());
}

请注意,这可能 return 列表的顺序与传入的顺序不同。如果您需要它的顺序相同,则可以将 map().collect() 更改为 forEachOrdered 然后在不排序的情况下将它们重新映射到结果中。

此外 cancelmayInterruptIfRunning 参数对 CompletableFuture 没有影响,所以我将其更改为 false。

CompletableFuture 完全忽略对 cancel(true) 的任何调用。我不知道为什么(大概是为了简化 API),但它很糟糕。如果你想让 futures 实际上可以取消(你可以手动检查中断,或者通过锁定来接受取消),那么你必须使用 Future,而不是 CompletableFuture.