使用 CompletableFuture 执行单个或多个 Callables 并避免阻塞

Use CompletableFuture to execute single or multiple Callables and avoid blocking

我通过 ThreadPoolExecutor 执行了一些调用。如果线程列表仅包含 1 个可调用项,那么我直接调用 CallableServicecall 方法。如果列表包含超过 1 个可调用对象,那么我将通过线程池执行器并行执行所有这些线程。

如何使用 Java 8 CompletableFuture 实现此目的?如果future.get()增强以避免阻塞,那将是一个加号。

private static ThreadPoolExecutor myThreadPoolExecutor = new ThreadPoolExecutor(0, 100, 5L, TimeUnit.SECONDS, new SynchronousQueue<>());

public static void execute(List<Callable<Boolean>> threadList) throws Exception {

    List<Future<Boolean>> futureList = null;
    CallableService singleService = (CallableService) threadList.get(0);
    if (1 == threadList.size()) {
        singleService.call();
    }
    else {
        try {
            futureList = myThreadPoolExecutor.invokeAll(threadList);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    if (null != futureList) {
        for (Future<Boolean> future : futureList) {
            try {
                future.get();
            }
            catch (Exception e) {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
            }
        }
    }
}

Future.isDone() 告诉我们执行者是否完成了任务处理。如果任务完成,它将 return 为真,否则,它将 return 为假。

 for (Future<Boolean> future : futureList) {
   while(!future.isDone()) 
   {
          doSOmethingElse();
          Thread.sleep(300);//Optional
    }
 try {
                future.get();
        }
    catch (Exception e) 
 {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
    }
}

但我们不必担心这一点,因为在确保任务完成后我们到达了调用 get() 的地步。

I execute a few callables through ThreadPoolExecutor. If thread list contains only 1 callable then I directly call call method of my CallableService. If list contains more than 1 callables then I execute all those threads in parallel via thread pool executor.

我猜你已经实现了这部分。 (如果您的作业繁重并且配置了 100 个线程 运行ning,您可能 运行 会遇到内存使用问题。但这是一个不同的问题。)

And if future.get() is enhanced to avoid blocking, that will be a plus.

为此,您可以采用以下方法:

  1. 创建另一个 ExecutorService,其工作将只是 运行 Future.get() 电话。
  2. 将您的 Future.get() 提交到该服务,如下所示。
  3. 关闭它并等待终止。

    if (null != futureList) {
        ExecutorService waitSvc = Executors.newCachedThreadPool();
        for (Future<Boolean> future : futureList) {
            try {
                waitSvc.submit( () -> future.get() );
            }
            catch (Exception e) {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
            }
        }
        waitSvc.shutdown(); //This may take some time. You may want to call awaitTermination() after this.
    }
    

但是,我觉得你应该重新设计使用这么多线程的整体方法,除非这只是一个学习应用程序。

不需要 CompletableFuture,因为您使用 ExecutorService 的方式就足够了,不过,代码流的某些方面可以改进。您获取第一个元素,即使不需要,也可以无缘无故地将其转换为 CallableService,因为您已经可以通过 Callable 接口调用该方法。在另一个分支中,您正在捕获 InterruptedException 并继续,因此调用者永远不会知道并非所有作业都已执行。在直接的代码流中,您不需要检查 null:

列表
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
    if(1 == threadList.size()) {
        Callable<Boolean> singleService = threadList.get(0);
        singleService.call();
    }
    else {
        List<Future<Boolean>> futureList = myThreadPoolExecutor.invokeAll(threadList);
        for(Future<Boolean> future : futureList) {
            try {
                future.get();
            }
            catch(Exception e) {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
            }
        }
    }
}

您可以进一步缩短为

public static void execute(List<Callable<Boolean>> threadList) throws Exception {
    if(1 == threadList.size()) {
        threadList.get(0).call();
    }
    else {
        for(Future<Boolean> future : myThreadPoolExecutor.invokeAll(threadList)) {
            try {
                future.get();
            }
            catch(Exception e) {
                //do some calculations here and then throw exception
                throw new Exception(e.getMessage(), e);
            }
        }
    }
}

但这是首选编码风格的问题。但请注意,它引起了我的注意,在单个元素的情况下,您没有执行相同的异常处理。


要使用 CompletableFuture,我们需要一个适配器方法,因为便捷方法 supplyAsync 需要 Supplier 而不是 Callable。使用 的修改变体,我们得到

public static void execute(List<Callable<Boolean>> threadList) throws Exception {
    if(1 == threadList.size()) {
        threadList.get(0).call();
    }
    else {
        CompletableFuture<?> all = CompletableFuture.allOf(
            threadList.stream()
                .map(c -> callAsync(c, myThreadPoolExecutor))
                .toArray(CompletableFuture<?>[]::new));
        try {
            all.get();
        }
        catch(Exception e) {
            //do some calculations here and then throw exception
            throw new Exception(e.getMessage(), e);
        }
    }
}
public static <R> CompletableFuture<R> callAsync(Callable<R> callable, Executor e) {
    CompletableFuture<R> cf = new CompletableFuture<>();
    CompletableFuture.runAsync(() -> {
        try { cf.complete(callable.call()); }
        catch(Throwable ex) { cf.completeExceptionally(ex); }
    }, e);
    return cf;
}

所以我们没有 invokeAll 负责提交所有作业。我们必须通过循环或流操作手动执行此操作。另一方面,我们通过代表完成状态的 allOf 获得一个单一的未来,如果至少有一个工作失败,则例外。

与等待完成的 invokeAll 不同,allOf 仅 return 未来,因此等待完成的是 all.get() 调用。我们可以在它之前做其他事情,甚至可以使用这个 属性 来始终在调用者线程中执行第一项工作:

public static void execute(List<Callable<Boolean>> threadList) throws Exception {
    CompletableFuture<?> tail = CompletableFuture.allOf(
        threadList.stream().skip(1)
            .map(c -> callAsync(c, myThreadPoolExecutor))
            .toArray(CompletableFuture<?>[]::new)),
        head = callAsync(threadList.get(0), Runnable::run);
    try {
        head.get();
        tail.get();
    }
    catch(Exception e) {
        //do some calculations here and then throw exception
        throw new Exception(e.getMessage(), e);
    }
}

这将始终调用当前线程中的第一个可调用对象,因为 Runnable::run 用作 Executor 将立即在调用线程中执行操作。但在其他所有方面都统一对待,尤其是异常处理。当只有一项工作时,allOf 使用空数组调用将什么都不做,return 一个已经完成的未来,这将达到预期的效果。