如何用 java 未来包装供应商 returns 一个超时值但在后台保持 运行 (可选择取消)
How to wrap a supplier with a java future that returns a value on timeout but keeps running in the background (with option to cancel)
例如,我有一个供应商可能需要时间 运行:
Supplier<Integer> numLoader = sneaky(() -> {
Thread.sleep(10000);
System.out.println("5 Calculated!");
return 5;
});
* sneaky
只是一个转换为 运行 时间异常的实用程序。
我希望能够做这样的事情:
Future<Integer> future = createFutureValueOnTimeout(-1, numLoader);
// numLoader takes 10 seconds to complete so -1 is returned.
int num = future.get(1000, TimeUnit.MILLISECONDS);
if (resourcesAreLow()) {
future.cancel(true);
}
doSomethingWithTheValue(num);
我也有 createFutureValueOnTimeout
的部分实现:
private static <V> Future<V> createFutureValueOnTimeout(V v, Supplier<V> supplier) {
CompletableFuture<V> completableFuture = CompletableFuture.supplyAsync(supplier);
return new FutureDecorator<V>(completableFuture) {
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
return completableFuture.completeOnTimeout(v, timeout, unit).get();
}
};
}
问题是调用 cancel
时,sleep
没有被中断。
- 我怎样才能让取消生效?
- 是否有更简单的方法 return 超时值?
完成测试:
public class TimeoutTest {
@SneakyThrows
@Test
public void testTimeout() {
int loadTimeMillis = 10000;
Supplier<Integer> numLoader = () -> {
try {
// Simulate long operation
Thread.sleep(loadTimeMillis);
} catch (InterruptedException e) {
System.out.println("Interrupted! message: " + e.getMessage());
throw Lombok.sneakyThrow(e);
}
System.out.println("5 Calculated!");
return 5;
};
Future<Integer> future = createFutureValueOnTimeout(-1, numLoader);
long start = System.currentTimeMillis();
// numLoader takes 10 seconds to complete so -1 is returned.
int num = future.get(1000, TimeUnit.MILLISECONDS);
System.out.println("Got: num: " + num + ". time: " + (System.currentTimeMillis() - start));
if (resourcesAreLow()) {
future.cancel(true);
}
// Don't stop the test. Give time for the cancel to kick in.
Thread.sleep(loadTimeMillis);
System.out.println("Finished. Time: " + (System.currentTimeMillis() - start));
}
private boolean resourcesAreLow() {
return true;
}
private static <V> Future<V> createFutureValueOnTimeout(V v, Supplier<V> supplier) {
CompletableFuture<V> completableFuture = CompletableFuture.supplyAsync(supplier);
return new FutureDecorator<V>(completableFuture) {
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
return completableFuture.completeOnTimeout(v, timeout, unit).get();
}
};
}
private static class FutureDecorator<V> implements Future<V> {
private final Future<V> inner;
private FutureDecorator(Future<V> inner) {this.inner = inner;}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return inner.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return inner.isCancelled();
}
@Override
public boolean isDone() {
return inner.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
return inner.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return inner.get(timeout, unit);
}
}
}
输出:(注意缺少 Interrupted!
消息)
Got: num: -1. time: 1007
5 Calculated!
Finished. Time: 11021
可以把支持取消的Executor
/Future
API和CompletableFuture
组合起来:
public static <R> CompletableFuture<R> withInterruptionSupport(Callable<R> c) {
CompletableFuture<R> cf = new CompletableFuture<>();
FutureTask<R> ft = new FutureTask<R>(c) {
@Override
protected void set(R v) {
super.set(v);
cf.complete(v);
}
@Override
protected void setException(Throwable t) {
super.setException(t);
cf.completeExceptionally(t);
}
};
cf.defaultExecutor().execute(ft);
cf.whenComplete((x,y) -> ft.cancel(true));
return cf;
}
由于在实际函数中支持中断通常意味着处理InterruptedException
,使用Callable
比Supplier
更方便,所以允许抛出这个异常。
支持中断取消的Future
在CompletableFuture
完成时无条件取消,只要完成源于任务本身,未来已经完成,后续的工作就没有问题取消将被忽略。
也就是说,我们这里不需要区分不同的补全可能性。 completeOnTimeout
不仅有效,您还可以在 CompletableFuture
上调用 cancel(…)
,它会中断 Callable
的计算(boolean
参数仍然无关紧要尽管)。即使在不等待超时的情况下使用替代结果调用 complete
也会中断 now-obsolete 评估。
因此,以下工作:
for(int timeout: new int[] { 5, 15 }) {
System.out.println("with timeout of " + timeout);
Integer i = withInterruptionSupport(() -> {
Thread.sleep(10000);
System.out.println("5 Calculated!");
return 5;
})
.completeOnTimeout(42, timeout, TimeUnit.SECONDS)
.join();
System.out.println("got " + i);
}
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
with timeout of 5
got 42
with timeout of 15
5 Calculated!
got 5
例如,我有一个供应商可能需要时间 运行:
Supplier<Integer> numLoader = sneaky(() -> {
Thread.sleep(10000);
System.out.println("5 Calculated!");
return 5;
});
* sneaky
只是一个转换为 运行 时间异常的实用程序。
我希望能够做这样的事情:
Future<Integer> future = createFutureValueOnTimeout(-1, numLoader);
// numLoader takes 10 seconds to complete so -1 is returned.
int num = future.get(1000, TimeUnit.MILLISECONDS);
if (resourcesAreLow()) {
future.cancel(true);
}
doSomethingWithTheValue(num);
我也有 createFutureValueOnTimeout
的部分实现:
private static <V> Future<V> createFutureValueOnTimeout(V v, Supplier<V> supplier) {
CompletableFuture<V> completableFuture = CompletableFuture.supplyAsync(supplier);
return new FutureDecorator<V>(completableFuture) {
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
return completableFuture.completeOnTimeout(v, timeout, unit).get();
}
};
}
问题是调用 cancel
时,sleep
没有被中断。
- 我怎样才能让取消生效?
- 是否有更简单的方法 return 超时值?
完成测试:
public class TimeoutTest {
@SneakyThrows
@Test
public void testTimeout() {
int loadTimeMillis = 10000;
Supplier<Integer> numLoader = () -> {
try {
// Simulate long operation
Thread.sleep(loadTimeMillis);
} catch (InterruptedException e) {
System.out.println("Interrupted! message: " + e.getMessage());
throw Lombok.sneakyThrow(e);
}
System.out.println("5 Calculated!");
return 5;
};
Future<Integer> future = createFutureValueOnTimeout(-1, numLoader);
long start = System.currentTimeMillis();
// numLoader takes 10 seconds to complete so -1 is returned.
int num = future.get(1000, TimeUnit.MILLISECONDS);
System.out.println("Got: num: " + num + ". time: " + (System.currentTimeMillis() - start));
if (resourcesAreLow()) {
future.cancel(true);
}
// Don't stop the test. Give time for the cancel to kick in.
Thread.sleep(loadTimeMillis);
System.out.println("Finished. Time: " + (System.currentTimeMillis() - start));
}
private boolean resourcesAreLow() {
return true;
}
private static <V> Future<V> createFutureValueOnTimeout(V v, Supplier<V> supplier) {
CompletableFuture<V> completableFuture = CompletableFuture.supplyAsync(supplier);
return new FutureDecorator<V>(completableFuture) {
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
return completableFuture.completeOnTimeout(v, timeout, unit).get();
}
};
}
private static class FutureDecorator<V> implements Future<V> {
private final Future<V> inner;
private FutureDecorator(Future<V> inner) {this.inner = inner;}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return inner.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return inner.isCancelled();
}
@Override
public boolean isDone() {
return inner.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
return inner.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return inner.get(timeout, unit);
}
}
}
输出:(注意缺少 Interrupted!
消息)
Got: num: -1. time: 1007
5 Calculated!
Finished. Time: 11021
可以把支持取消的Executor
/Future
API和CompletableFuture
组合起来:
public static <R> CompletableFuture<R> withInterruptionSupport(Callable<R> c) {
CompletableFuture<R> cf = new CompletableFuture<>();
FutureTask<R> ft = new FutureTask<R>(c) {
@Override
protected void set(R v) {
super.set(v);
cf.complete(v);
}
@Override
protected void setException(Throwable t) {
super.setException(t);
cf.completeExceptionally(t);
}
};
cf.defaultExecutor().execute(ft);
cf.whenComplete((x,y) -> ft.cancel(true));
return cf;
}
由于在实际函数中支持中断通常意味着处理InterruptedException
,使用Callable
比Supplier
更方便,所以允许抛出这个异常。
支持中断取消的Future
在CompletableFuture
完成时无条件取消,只要完成源于任务本身,未来已经完成,后续的工作就没有问题取消将被忽略。
也就是说,我们这里不需要区分不同的补全可能性。 completeOnTimeout
不仅有效,您还可以在 CompletableFuture
上调用 cancel(…)
,它会中断 Callable
的计算(boolean
参数仍然无关紧要尽管)。即使在不等待超时的情况下使用替代结果调用 complete
也会中断 now-obsolete 评估。
因此,以下工作:
for(int timeout: new int[] { 5, 15 }) {
System.out.println("with timeout of " + timeout);
Integer i = withInterruptionSupport(() -> {
Thread.sleep(10000);
System.out.println("5 Calculated!");
return 5;
})
.completeOnTimeout(42, timeout, TimeUnit.SECONDS)
.join();
System.out.println("got " + i);
}
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
with timeout of 5
got 42
with timeout of 15
5 Calculated!
got 5