如何有效地使用 CompletableFuture 为每个输入映射异步任务

How to efficiently use CompletableFuture to map async task per input

我想 return 映射包含所有键到值的映射,API 对这些键的响应。我为此使用 CompletableFutureGuava。以下是我的尝试。有没有其他标准方法可以用 Java 8 和线程 APIs 实现相同的效果?

地图是id -> apiResponse(id)

    
    public static List<String> returnAPIResponse(Integer key) {
        return Lists.newArrayList(key.toString() + " Test");
    }

    public static void main(String[] args) {
        List<Integer> keys = Lists.newArrayList(1, 2, 3, 4);

        List<CompletableFuture<SimpleEntry<Integer, List<String>>>> futures = keys
            .stream()
            .map(key -> CompletableFuture.supplyAsync(
                () -> new AbstractMap.SimpleEntry<>(key, returnAPIResponse(key))))
            .collect(Collectors.toList());

        System.out.println(
            futures.parallelStream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()));

    }

这里有一个有趣的行为,我将尽力解释。让我们从简单开始,让我们暂时忘记 CompletableFuture 并简单地使用普通的 parallelStream 来执行此操作,并添加一个小的调试步骤:

List<Integer> keys = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);

Map<Integer, List<String>> result =
    keys.parallelStream()
        .map(x -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)))
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

System.out.println("parallelism : " + pool.getParallelism() + " current : " + pool.getPoolSize());

在我的机器上,打印出:

parallelism : 11 current : 11

我假设您已经知道 parallelStream 的操作在 common ForkJoinPool 中执行。该输出的含义可能也很明显:11 threads 可用并且全部已使用。

我现在稍微修改一下你的例子:

List<Integer> keys = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);

ForkJoinPool pool = ForkJoinPool.commonPool();
ExecutorService supplyPool = Executors.newFixedThreadPool(2);

Map<Integer, List<String>> result =
keys.parallelStream()
    .map(x -> CompletableFuture.supplyAsync(
             () -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)),
             supplyPool
    ))
    .map(CompletableFuture::join)
    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

 System.out.println("parallelism : " + pool.getParallelism() + " current : " + pool.getPoolSize());

其实只是一个重要的改动,我会让你的supplyAsync运行在自己的线程池中;其余的是一样的。 运行 这,揭示:

parallelism : 11 current : 16

惊喜。创建了比我们想要的更多的线程?好吧,getPoolSize 的文档说:

Returns the number of worker threads that have started but not yet terminated. The result returned by this method may differ from getParallelism when threads are created to maintain parallelism when others are cooperatively blocked.

您的案例中的阻止是通过 map(CompletableFuture::join) 发生的。您已经从 ForkJoinPool 中有效地阻止了一个工作线程,并且它通过旋转另一个线程来补偿它。


如果不想陷入这样的惊喜:

List<CompletableFuture<AbstractMap.SimpleEntry<Integer, List<String>>>> list =
keys.stream()
    .map(x -> CompletableFuture.supplyAsync(
         () -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)),
         supplyPool
     ))
    .collect(Collectors.toList());

CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();

Map<Integer, List<String>> result =
list.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

因为ForJoinPool的worker线程上没有join,可以drop parallelStream。然后我仍然 block 通过以下方式获得结果:

CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();

但不会生成补偿线程。因为 CompletableFuture.allOf returns a CompletableFuture<Void>,我需要再次流式传输才能得到结果。

不要让最后一个流操作中的.map(CompletableFuture::join)骗了你,没有阻塞是因为前面的CompletableFuture::allOf已经阻塞并等待所有任务完成