如何用 java 未来包装供应商 returns 一个超时值但在后台保持 运行 (可选择取消)

How to wrap a supplier with a java future that returns a value on timeout but keeps running in the background (with option to cancel)

例如,我有一个供应商可能需要时间 运行:

Supplier<Integer> numLoader = sneaky(() -> {
    Thread.sleep(10000);
    System.out.println("5 Calculated!");
    return 5;
});

* sneaky 只是一个转换为 运行 时间异常的实用程序。

我希望能够做这样的事情:

Future<Integer> future = createFutureValueOnTimeout(-1, numLoader);
// numLoader takes 10 seconds to complete so -1 is returned.
int num = future.get(1000, TimeUnit.MILLISECONDS);
if (resourcesAreLow()) {
    future.cancel(true);
}
doSomethingWithTheValue(num);

我也有 createFutureValueOnTimeout 的部分实现:

private static <V> Future<V> createFutureValueOnTimeout(V v, Supplier<V> supplier) {
    CompletableFuture<V> completableFuture = CompletableFuture.supplyAsync(supplier);
    return new FutureDecorator<V>(completableFuture) {
        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
            return completableFuture.completeOnTimeout(v, timeout, unit).get();
        }
    };
}

问题是调用 cancel 时,sleep 没有被中断。

  1. 我怎样才能让取消生效?
  2. 是否有更简单的方法 return 超时值?

完成测试:

public class TimeoutTest {
    @SneakyThrows
    @Test
    public void testTimeout() {
        int loadTimeMillis = 10000;
        Supplier<Integer> numLoader = () -> {
            try {
                // Simulate long operation
                Thread.sleep(loadTimeMillis);
            } catch (InterruptedException e) {
                System.out.println("Interrupted! message: " + e.getMessage());
                throw Lombok.sneakyThrow(e);
            }
            System.out.println("5 Calculated!");
            return 5;
        };

        Future<Integer> future = createFutureValueOnTimeout(-1, numLoader);
        
        long start = System.currentTimeMillis();

        // numLoader takes 10 seconds to complete so -1 is returned.
        int num = future.get(1000, TimeUnit.MILLISECONDS);

        System.out.println("Got: num: " + num + ". time: " + (System.currentTimeMillis() - start));

        if (resourcesAreLow()) {
            future.cancel(true);
        }
        // Don't stop the test. Give time for the cancel to kick in.
        Thread.sleep(loadTimeMillis);
        System.out.println("Finished. Time: " + (System.currentTimeMillis() - start));
    }

    private boolean resourcesAreLow() {
        return true;
    }

    private static <V> Future<V> createFutureValueOnTimeout(V v, Supplier<V> supplier) {
        CompletableFuture<V> completableFuture = CompletableFuture.supplyAsync(supplier);
        return new FutureDecorator<V>(completableFuture) {
            @Override
            public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
                return completableFuture.completeOnTimeout(v, timeout, unit).get();
            }
        };
    }

    private static class FutureDecorator<V> implements Future<V> {
        private final Future<V> inner;

        private FutureDecorator(Future<V> inner) {this.inner = inner;}

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return inner.cancel(mayInterruptIfRunning);
        }

        @Override
        public boolean isCancelled() {
            return inner.isCancelled();
        }

        @Override
        public boolean isDone() {
            return inner.isDone();
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            return inner.get();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return inner.get(timeout, unit);
        }
    }
}

输出:(注意缺少 Interrupted! 消息

Got: num: -1. time: 1007
5 Calculated!
Finished. Time: 11021

可以把支持取消的Executor/FutureAPI和CompletableFuture组合起来:

public static <R> CompletableFuture<R> withInterruptionSupport(Callable<R> c) {
    CompletableFuture<R> cf = new CompletableFuture<>();
    FutureTask<R> ft = new FutureTask<R>(c) {
        @Override
        protected void set(R v) {
            super.set(v);
            cf.complete(v);
        }
        @Override
        protected void setException(Throwable t) {
            super.setException(t);
            cf.completeExceptionally(t);
        }
    };
    cf.defaultExecutor().execute(ft);
    cf.whenComplete((x,y) -> ft.cancel(true));
    return cf;
}

由于在实际函数中支持中断通常意味着处理InterruptedException,使用CallableSupplier更方便,所以允许抛出这个异常。

支持中断取消的FutureCompletableFuture完成时无条件取消,只要完成源于任务本身,未来已经完成,后续的工作就没有问题取消将被忽略。

也就是说,我们这里不需要区分不同的补全可能性。 completeOnTimeout 不仅有效,您还可以在 CompletableFuture 上调用 cancel(…) ,它会中断 Callable 的计算(boolean 参数仍然无关紧要尽管)。即使在不等待超时的情况下使用替代结果调用 complete 也会中断 now-obsolete 评估。

因此,以下工作:

for(int timeout: new int[] { 5, 15 }) {
    System.out.println("with timeout of " + timeout);
    Integer i = withInterruptionSupport(() -> {
            Thread.sleep(10000);
            System.out.println("5 Calculated!");
            return 5;
        })
        .completeOnTimeout(42, timeout, TimeUnit.SECONDS)
        .join();
    System.out.println("got " + i);
}
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
with timeout of 5
got 42
with timeout of 15
5 Calculated!
got 5