Java - CompletableFutures - 如果有例外,我如何取消所有期货
Java - CompletableFutures - How can i cancel all futures if there are exceptions
我有一个方法(包含在下面)来 return CompletableFuture
列表的值。
该方法应该:
- 能够在给定时间后超时。
- 如果有超过 n 个例外,则能够取消所有期货。
第一点效果很好,确实在超过超时限制后就炸了。 (之后我仍然需要调用 exectuorService.shutdownNow()
到 return 给调用者)。我遇到的问题是我要完成的第二件事。
假设我有一个包含 20,000 个期货的列表,它们都会有一个例外,那么为什么让它们全部执行,如果我看到有太多的例外,那么我假设所有的都出了问题期货,我想取消它们。
此外,我希望每个未来都有一个超时,这可能需要多长时间,但这也行不通,出于下面概述的相同原因而谦逊。
似乎是因为当我调用allDoneFuture.thenApply()
时,此时它等待并让所有期货完成,无论是成功还是异常。只有在所有这些都完成之后,它才会遍历每个未来并获取其结果。到那时取消有什么好处,当他们已经完成时。
如果有人能告诉我如何满足这个特定需求,我将不胜感激:“监视异常和个别超时,并基于此取消所有其他”。
谢谢。
下面是我写的方法:
/**
* @param futures a list of completable futures
* @param timeout how long to allow the futures to run before throwing exception
* @param timeUnit unit of timeout
* @param allowedExceptions how many of the futures do we tolerate exceptions,
* NOTE: if an exception is thrown from the futures it will return null, until it reaches the allowedExceptions threshold
* */
public static <T> List<T> extractFromFutures(List<CompletableFuture<T>> futures, int timeout, TimeUnit timeUnit, int allowedExceptions) {
CompletableFuture<Void> allDoneFuture = CompletableFuture
.allOf(futures.toArray(new CompletableFuture[futures.size()]));
try {
AtomicInteger exceptionCount = new AtomicInteger(0);
return allDoneFuture.thenApply(v ->//when all are done
futures.stream().
map(future -> {
try {
//if only I could set an individual timeout
return future.get(timeout, timeUnit);
} catch (Exception e) {
future.cancel(true);
int curExceptionCnt = exceptionCount.incrementAndGet();
if(curExceptionCnt >= allowedExceptions){
//I would've hoped that it will throw it to the calling try-catch
//and then cancel all futures, but it doesn't
throw new RuntimeException(e);
}
else{
return null;
}
}
}).
collect(Collectors.<T>toList())
).get(timeout, timeUnit);
} catch (Exception e) {
allDoneFuture.cancel(true);
throw new RuntimeException(e);
}
}
要在一定数量的异常后取消所有剩余的期货,您可以对每个异常调用 exceptionally
并增加异常计数并可能在其中取消它们。
对于单独的超时,您可以创建一个 class 来保存未来的超时,然后根据超时对它们进行排序,并调用 get
用超时减去经过的时间。
static class FutureWithTimeout<T> {
CompletableFuture<T> f;
long timeout;
TimeUnit timeUnit;
FutureWithTimeout(CompletableFuture<T> f, long timeout, TimeUnit timeUnit) {
this.f = f;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
}
public static <T> List<T> extractFromFutures(List<FutureWithTimeout<T>> futures, int allowedExceptions) {
AtomicInteger exceptionCount = new AtomicInteger(0);
futures.forEach(f -> f.f.exceptionally(t -> {
if(exceptionCount.getAndIncrement() == allowedExceptions){
futures.forEach(c -> c.f.cancel(false));
}
return null;
}));
long t = System.nanoTime();
return futures.stream()
.sorted(Comparator.comparingLong(f -> f.timeUnit.toNanos(f.timeout)))
.map(f -> {
try {
return f.f.get(Math.max(0, f.timeUnit.toNanos(f.timeout) - (System.nanoTime() - t)),
TimeUnit.NANOSECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
f.f.cancel(false);
return null;
}
})
.collect(Collectors.toList());
}
请注意,这可能 return 列表的顺序与传入的顺序不同。如果您需要它的顺序相同,则可以将 map().collect()
更改为 forEachOrdered
然后在不排序的情况下将它们重新映射到结果中。
此外 cancel
的 mayInterruptIfRunning
参数对 CompletableFuture
没有影响,所以我将其更改为 false。
CompletableFuture
完全忽略对 cancel(true)
的任何调用。我不知道为什么(大概是为了简化 API),但它很糟糕。如果你想让 futures 实际上可以取消(你可以手动检查中断,或者通过锁定来接受取消),那么你必须使用 Future
,而不是 CompletableFuture
.
我有一个方法(包含在下面)来 return CompletableFuture
列表的值。
该方法应该:
- 能够在给定时间后超时。
- 如果有超过 n 个例外,则能够取消所有期货。
第一点效果很好,确实在超过超时限制后就炸了。 (之后我仍然需要调用 exectuorService.shutdownNow()
到 return 给调用者)。我遇到的问题是我要完成的第二件事。
假设我有一个包含 20,000 个期货的列表,它们都会有一个例外,那么为什么让它们全部执行,如果我看到有太多的例外,那么我假设所有的都出了问题期货,我想取消它们。
此外,我希望每个未来都有一个超时,这可能需要多长时间,但这也行不通,出于下面概述的相同原因而谦逊。
似乎是因为当我调用allDoneFuture.thenApply()
时,此时它等待并让所有期货完成,无论是成功还是异常。只有在所有这些都完成之后,它才会遍历每个未来并获取其结果。到那时取消有什么好处,当他们已经完成时。
如果有人能告诉我如何满足这个特定需求,我将不胜感激:“监视异常和个别超时,并基于此取消所有其他”。
谢谢。
下面是我写的方法:
/**
* @param futures a list of completable futures
* @param timeout how long to allow the futures to run before throwing exception
* @param timeUnit unit of timeout
* @param allowedExceptions how many of the futures do we tolerate exceptions,
* NOTE: if an exception is thrown from the futures it will return null, until it reaches the allowedExceptions threshold
* */
public static <T> List<T> extractFromFutures(List<CompletableFuture<T>> futures, int timeout, TimeUnit timeUnit, int allowedExceptions) {
CompletableFuture<Void> allDoneFuture = CompletableFuture
.allOf(futures.toArray(new CompletableFuture[futures.size()]));
try {
AtomicInteger exceptionCount = new AtomicInteger(0);
return allDoneFuture.thenApply(v ->//when all are done
futures.stream().
map(future -> {
try {
//if only I could set an individual timeout
return future.get(timeout, timeUnit);
} catch (Exception e) {
future.cancel(true);
int curExceptionCnt = exceptionCount.incrementAndGet();
if(curExceptionCnt >= allowedExceptions){
//I would've hoped that it will throw it to the calling try-catch
//and then cancel all futures, but it doesn't
throw new RuntimeException(e);
}
else{
return null;
}
}
}).
collect(Collectors.<T>toList())
).get(timeout, timeUnit);
} catch (Exception e) {
allDoneFuture.cancel(true);
throw new RuntimeException(e);
}
}
要在一定数量的异常后取消所有剩余的期货,您可以对每个异常调用 exceptionally
并增加异常计数并可能在其中取消它们。
对于单独的超时,您可以创建一个 class 来保存未来的超时,然后根据超时对它们进行排序,并调用 get
用超时减去经过的时间。
static class FutureWithTimeout<T> {
CompletableFuture<T> f;
long timeout;
TimeUnit timeUnit;
FutureWithTimeout(CompletableFuture<T> f, long timeout, TimeUnit timeUnit) {
this.f = f;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
}
public static <T> List<T> extractFromFutures(List<FutureWithTimeout<T>> futures, int allowedExceptions) {
AtomicInteger exceptionCount = new AtomicInteger(0);
futures.forEach(f -> f.f.exceptionally(t -> {
if(exceptionCount.getAndIncrement() == allowedExceptions){
futures.forEach(c -> c.f.cancel(false));
}
return null;
}));
long t = System.nanoTime();
return futures.stream()
.sorted(Comparator.comparingLong(f -> f.timeUnit.toNanos(f.timeout)))
.map(f -> {
try {
return f.f.get(Math.max(0, f.timeUnit.toNanos(f.timeout) - (System.nanoTime() - t)),
TimeUnit.NANOSECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
f.f.cancel(false);
return null;
}
})
.collect(Collectors.toList());
}
请注意,这可能 return 列表的顺序与传入的顺序不同。如果您需要它的顺序相同,则可以将 map().collect()
更改为 forEachOrdered
然后在不排序的情况下将它们重新映射到结果中。
此外 cancel
的 mayInterruptIfRunning
参数对 CompletableFuture
没有影响,所以我将其更改为 false。
CompletableFuture
完全忽略对 cancel(true)
的任何调用。我不知道为什么(大概是为了简化 API),但它很糟糕。如果你想让 futures 实际上可以取消(你可以手动检查中断,或者通过锁定来接受取消),那么你必须使用 Future
,而不是 CompletableFuture
.