使用 ExecutorService 时出现超时问题

Issue with timeouts when using ExecutorService

我有一个包含数十万 Callable 个对象的列表。当每个都是 运行 时,它会根据给定的值执行可能很长的计算。因此,我想 运行 每个任务异步执行(最好通过使用某种执行器)并在 30 秒后检索每个计算的结果,取消那些没有及时完成的​​。 (获得的值用于其他地方。)

到目前为止我是这样实现的:

private void process() {
    class Runner implements Callable<BigDecimal> {
        final int x, y;

        Runner(int x, int y) {
            this.x = x;
            this.y = y;
        }

        @Override
        public BigDecimal call() {
            BigDecimal gWidth = xMax.subtract(xMin), gHeight = yMax.subtract(yMin);
            BigDecimal gy = gHeight.multiply(BigDecimal.valueOf(-y)).divide(BigDecimal.valueOf(height)).add(yMax);
            BigDecimal gx = gWidth.multiply(BigDecimal.valueOf(x)).divide(BigDecimal.valueOf(width)).add(xMin);

            // The calculation begins when this method is called
            BigDecimal result = calculateAt(gx, gy);
            return result;
        }
    }

    ExecutorService exec = Executors.newCachedThreadPool();
    List<Runner> runners = new ArrayList<>();
    for (int y = 0; y < height; y++) {
        for (int x = 0; x < width; x++) {
            runners.add(new Runner(x, y));
        }
    }

    try {
        List<Future<BigDecimal>> results = exec.invokeAll(runners, 30, TimeUnit.SECONDS);
        for (Future<BigDecimal> future : results) {
            // Check if the future's task was cancelled and process the results
        }
    } catch (InterruptedException | ExecutionException ex) {
        ex.printStackTrace();
    }
    exec.shutdown();
}

// Extra variables and methods
BigDecimal xMin = BigDecimal.valueOf(-7),
           xMax = BigDecimal.valueOf(7),
           yMin = BigDecimal.valueOf(-7),
           yMax = BigDecimal.valueOf(7);
int height = 850, width = 850;
private BigDecimal calculateAt(BigDecimal x, BigDecimal y) {
    try {
        // Just to simulate a potential execution time
        Thread.sleep((ThreadLocalRandom.current().nextInt(45) + 1) * 1000);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
    return BigDecimal.ONE;
}

ArrayListrunners存储每个Callable待执行的任务,然后发送给ExecutorService运行所有任务。我遇到的问题是 运行 任务似乎是同步启动的,超时 30 秒后,只有前 40 或 5 万个任务完成,更不用说开始执行了。

似乎正在发生的事情是 ExecutorService.invokeAll 方法仅允许 30 秒 window 用于 所有 列表中的任务 执行完毕。相反,我需要的是这 30 秒 window 在每个任务的基础上开始,即允许任务完成 30 秒 一旦它已经开始。 invokeAll 似乎没有这样做,至少没有 newCachedThreadPool。是否有 Java 库或其他实现方法?

最简单的方法是单独调用每个任务,例如在您初始化 Runner 的循环中。由于您上面提到的行为符合 JavaDoc documentation of ExecutorService,因此没有任何子类(至少 Java 标准版随附的子类)会有不同的行为。实现此行为的独特功能也不存在。

来自 #invokeAll(List<? extends Callable>, long, TimeUnit) 的文档:

Executes the given tasks, returning a list of Futures holding their status and results when all complete or the timeout expires, whichever happens first. Future.isDone() is true for each element of the returned list. Upon return, tasks that have not completed are cancelled. Note that a completed task could have terminated either normally or by throwing an exception. The results of this method are undefined if the given collection is modified while this operation is in progress.

我想你可以用 CompletableFuture 解决问题。

例如,这是一个基于问题片段的代码:

private static final ExecutorService executor = Executors.newCachedThreadPool();
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(100);

private static void processAsync() {
  List<CompletableFuture<Object>> futureList = IntStream.range(0, height).boxed()
    .flatMap(y -> IntStream.range(0, width).boxed().map(x -> new Runner(x, y)))
    .map(runner ->
      CompletableFuture.anyOf(
        CompletableFuture.supplyAsync(runner, executor),
        timeout(Duration.ofSeconds(30))
      ).exceptionally(throwable -> {
        // timeout is handled here
        return BigDecimal.ZERO;
      })
    )
    .collect(Collectors.toList());

  CompletableFuture.allOf(futureList.toArray(new CompletableFuture<?>[0]))
    .thenAccept(v -> {
      List<BigDecimal> results = futureList.stream()
        .map(CompletableFuture::join)
        .map(r -> (BigDecimal) r)
        .collect(Collectors.toList());
      // process final results
      BigDecimal sum = results.stream().reduce(BigDecimal.ZERO, BigDecimal::add);
      System.out.println("Final sum: " + sum);
    })
    .exceptionally(throwable -> {
      System.out.println("Failed allOf with " + throwable);
      return null;
    });
}

private static CompletableFuture<BigDecimal> timeout(Duration duration) {
  CompletableFuture<BigDecimal> future = new CompletableFuture<>();
  scheduler.schedule(
    () -> future.completeExceptionally(new TimeoutException("Timeout " + Thread.currentThread().getName())), duration.toMillis(), MILLISECONDS);
  return future;
}

private static class Runner implements Supplier<BigDecimal> {...

这里的主要思想是使用CompletableFuture.anyOf并将其应用于您有用的任务CompletableFuture和超时任务的CopmpletableFuture。超时是通过使用 ScheduledExecutorServiceCompletableFuture.completeExceptionally 实现的。所以,基本上,anyOf returns 要么是结果,要么是 TimeoutException.