如何强制超时和取消异步 CompletableFuture 作业

How to enforce timeout and cancel async CompletableFuture Jobs

我正在使用 Java 8,我想知道在 3 个异步作业上强制执行超时的推荐方法,我将执行异步作业并从未来检索结果。请注意,所有 3 个作业的超时时间都相同。如果超过时间限制我也想取消作业

我在想这样的事情:

// Submit jobs async
List<CompletableFuture<String>> futures = submitJobs(); // Uses CompletableFuture.supplyAsync

List<CompletableFuture<Void>> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

try {
    allFutures.get(100L, TimeUnit.MILLISECONDS);
} catch (TimeoutException e){
   for(CompletableFuture f : future) {
      if(!f.isDone()) {
         /*
         From Java Doc:
         @param mayInterruptIfRunning this value has no effect in this
             * implementation because interrupts are not used to control
             * processing.
         */
         f.cancel(true);
      }
   }
}

List<String> output = new ArrayList<>();
for(CompeletableFuture fu : futures) {
   if(!fu.isCancelled()) { // Is this needed?
      output.add(fu.join());
   }
}

return output;

  1. 这样的东西行得通吗?有没有更好的方法?
  2. 如何正确取消未来? Java 医生说,线程不能被中断?那么,如果我要取消一个 future 并调用 join(),我会立即得到结果吗,因为线程不会被中断?
  3. 等待结束后,推荐使用join()还是get()获取结果?

值得注意的是,在 CompletableFuture 上调用 cancel 与在当前阶段调用 completeExceptionally 实际上是一样的。取消不会影响之前的阶段。话虽如此:

  1. 原则上,假设不需要上游取消,这样的事情就可以工作(从伪代码的角度来看,上面有语法错误)。
  2. CompletableFuture取消不会打断当前线程。取消将导致所有下游阶段立即被 CancellationException 触发(将使执行流程短路)。
  3. 'join' 和 'get' 在调用者愿意无限期等待的情况下实际上是相同的。加入句柄为您包装已检查的异常。如果调用者想要超时,则需要get。

包括一个片段来说明取消时的行为。请注意下游流程如何不会启动,但上游流程即使在取消后也会继续。

    public static void main(String[] args) throws Exception
    {
        int maxSleepTime = 1000;
        Random random = new Random();
        AtomicInteger value = new AtomicInteger();
        List<String> calculatedValues = new ArrayList<>();
        Supplier<String> process = () -> { try { Thread.sleep(random.nextInt(maxSleepTime)); System.out.println("Stage 1 Running!"); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.toString(value.getAndIncrement()); };
        List<CompletableFuture<String>> stage1 = IntStream.range(0, 10).mapToObj(val -> CompletableFuture.supplyAsync(process)).collect(Collectors.toList());
        List<CompletableFuture<String>> stage2 = stage1.stream().map(Test::appendNumber).collect(Collectors.toList());
        List<CompletableFuture<String>> stage3 = stage2.stream().map(Test::printIfCancelled).collect(Collectors.toList());
        CompletableFuture<Void> awaitAll = CompletableFuture.allOf(stage2.toArray(new CompletableFuture[0]));
        try
        {
            /*Wait 1/2 the time, some should be complete. Some not complete -> TimeoutException*/
            awaitAll.get(maxSleepTime / 2, TimeUnit.MILLISECONDS);
        }
        catch(TimeoutException ex)
        {
            for(CompletableFuture<String> toCancel : stage2)
            {
                boolean irrelevantValue = false;
                if(!toCancel.isDone())
                    toCancel.cancel(irrelevantValue);
                else
                    calculatedValues.add(toCancel.join());
            }
        }
        System.out.println("All futures Cancelled! But some Stage 1's may still continue printing anyways.");
        System.out.println("Values returned as of cancellation: " + calculatedValues);
        Thread.sleep(maxSleepTime);
    }

    private static CompletableFuture<String> appendNumber(CompletableFuture<String> baseFuture) 
    {
        return baseFuture.thenApply(val -> {  System.out.println("Stage 2 Running"); return "#" + val; }); 
    }
    
    private static CompletableFuture<String> printIfCancelled(CompletableFuture<String> baseFuture) 
    { 
        return baseFuture.thenApply(val ->  { System.out.println("Stage 3 Running!"); return val; }).exceptionally(ex -> { System.out.println("Stage 3 Cancelled!"); return ex.getMessage(); }); 
    }

如果需要取消上游进程(例如:取消某些网络调用),则需要自定义处理。

调用cancel后你不能加入furure,因为你得到一个异常。

终止计算的一种方法是让它拥有对未来的引用并定期检查它:如果它被取消,则从内部中止计算。 如果计算是一个循环,您可以在每次迭代中进行检查,就可以做到这一点。

你需要它成为 CompletableFuture 吗?另一种方法是避免使用 CompleatableFuture,而是使用简单的 Future 或 FutureTask:如果您使用执行器调用 future.cancel(true) 执行它,则可能会终止计算。

回答问题:“调用join(),我会立即得到结果吗”。

不,你不会立即得到它,它会挂起并等待完成计算:没有办法强制在更短的时间内完成需要很长时间的计算。

您可以调用 future.complete(value) 提供一个值,供引用该未来的其他线程用作默认结果。