如何有效地使用 CompletableFuture 为每个输入映射异步任务
How to efficiently use CompletableFuture to map async task per input
我想 return 映射包含所有键到值的映射,API 对这些键的响应。我为此使用 CompletableFuture
和 Guava
。以下是我的尝试。有没有其他标准方法可以用 Java 8 和线程 APIs 实现相同的效果?
地图是id -> apiResponse(id)
。
public static List<String> returnAPIResponse(Integer key) {
return Lists.newArrayList(key.toString() + " Test");
}
public static void main(String[] args) {
List<Integer> keys = Lists.newArrayList(1, 2, 3, 4);
List<CompletableFuture<SimpleEntry<Integer, List<String>>>> futures = keys
.stream()
.map(key -> CompletableFuture.supplyAsync(
() -> new AbstractMap.SimpleEntry<>(key, returnAPIResponse(key))))
.collect(Collectors.toList());
System.out.println(
futures.parallelStream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
这里有一个有趣的行为,我将尽力解释。让我们从简单开始,让我们暂时忘记 CompletableFuture
并简单地使用普通的 parallelStream
来执行此操作,并添加一个小的调试步骤:
List<Integer> keys = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
Map<Integer, List<String>> result =
keys.parallelStream()
.map(x -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
System.out.println("parallelism : " + pool.getParallelism() + " current : " + pool.getPoolSize());
在我的机器上,打印出:
parallelism : 11 current : 11
我假设您已经知道 parallelStream
的操作在 common
ForkJoinPool
中执行。该输出的含义可能也很明显:11 threads
可用并且全部已使用。
我现在稍微修改一下你的例子:
List<Integer> keys = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
ForkJoinPool pool = ForkJoinPool.commonPool();
ExecutorService supplyPool = Executors.newFixedThreadPool(2);
Map<Integer, List<String>> result =
keys.parallelStream()
.map(x -> CompletableFuture.supplyAsync(
() -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)),
supplyPool
))
.map(CompletableFuture::join)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
System.out.println("parallelism : " + pool.getParallelism() + " current : " + pool.getPoolSize());
其实只是一个重要的改动,我会让你的supplyAsync
运行在自己的线程池中;其余的是一样的。 运行 这,揭示:
parallelism : 11 current : 16
惊喜。创建了比我们想要的更多的线程?好吧,getPoolSize
的文档说:
Returns the number of worker threads that have started but not yet terminated. The result returned by this method may differ from getParallelism when threads are created to maintain parallelism when others are cooperatively blocked.
您的案例中的阻止是通过 map(CompletableFuture::join)
发生的。您已经从 ForkJoinPool
中有效地阻止了一个工作线程,并且它通过旋转另一个线程来补偿它。
如果不想陷入这样的惊喜:
List<CompletableFuture<AbstractMap.SimpleEntry<Integer, List<String>>>> list =
keys.stream()
.map(x -> CompletableFuture.supplyAsync(
() -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)),
supplyPool
))
.collect(Collectors.toList());
CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();
Map<Integer, List<String>> result =
list.stream()
.map(CompletableFuture::join)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
因为ForJoinPool
的worker线程上没有join
,可以drop parallelStream
。然后我仍然 block 通过以下方式获得结果:
CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();
但不会生成补偿线程。因为 CompletableFuture.allOf
returns a CompletableFuture<Void>
,我需要再次流式传输才能得到结果。
不要让最后一个流操作中的.map(CompletableFuture::join)
骗了你,没有阻塞是因为前面的CompletableFuture::allOf
已经阻塞并等待所有任务完成
我想 return 映射包含所有键到值的映射,API 对这些键的响应。我为此使用 CompletableFuture
和 Guava
。以下是我的尝试。有没有其他标准方法可以用 Java 8 和线程 APIs 实现相同的效果?
地图是id -> apiResponse(id)
。
public static List<String> returnAPIResponse(Integer key) {
return Lists.newArrayList(key.toString() + " Test");
}
public static void main(String[] args) {
List<Integer> keys = Lists.newArrayList(1, 2, 3, 4);
List<CompletableFuture<SimpleEntry<Integer, List<String>>>> futures = keys
.stream()
.map(key -> CompletableFuture.supplyAsync(
() -> new AbstractMap.SimpleEntry<>(key, returnAPIResponse(key))))
.collect(Collectors.toList());
System.out.println(
futures.parallelStream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
这里有一个有趣的行为,我将尽力解释。让我们从简单开始,让我们暂时忘记 CompletableFuture
并简单地使用普通的 parallelStream
来执行此操作,并添加一个小的调试步骤:
List<Integer> keys = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
Map<Integer, List<String>> result =
keys.parallelStream()
.map(x -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
System.out.println("parallelism : " + pool.getParallelism() + " current : " + pool.getPoolSize());
在我的机器上,打印出:
parallelism : 11 current : 11
我假设您已经知道 parallelStream
的操作在 common
ForkJoinPool
中执行。该输出的含义可能也很明显:11 threads
可用并且全部已使用。
我现在稍微修改一下你的例子:
List<Integer> keys = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
ForkJoinPool pool = ForkJoinPool.commonPool();
ExecutorService supplyPool = Executors.newFixedThreadPool(2);
Map<Integer, List<String>> result =
keys.parallelStream()
.map(x -> CompletableFuture.supplyAsync(
() -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)),
supplyPool
))
.map(CompletableFuture::join)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
System.out.println("parallelism : " + pool.getParallelism() + " current : " + pool.getPoolSize());
其实只是一个重要的改动,我会让你的supplyAsync
运行在自己的线程池中;其余的是一样的。 运行 这,揭示:
parallelism : 11 current : 16
惊喜。创建了比我们想要的更多的线程?好吧,getPoolSize
的文档说:
Returns the number of worker threads that have started but not yet terminated. The result returned by this method may differ from getParallelism when threads are created to maintain parallelism when others are cooperatively blocked.
您的案例中的阻止是通过 map(CompletableFuture::join)
发生的。您已经从 ForkJoinPool
中有效地阻止了一个工作线程,并且它通过旋转另一个线程来补偿它。
如果不想陷入这样的惊喜:
List<CompletableFuture<AbstractMap.SimpleEntry<Integer, List<String>>>> list =
keys.stream()
.map(x -> CompletableFuture.supplyAsync(
() -> new AbstractMap.SimpleEntry<>(x, returnAPIResponse(x)),
supplyPool
))
.collect(Collectors.toList());
CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();
Map<Integer, List<String>> result =
list.stream()
.map(CompletableFuture::join)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
因为ForJoinPool
的worker线程上没有join
,可以drop parallelStream
。然后我仍然 block 通过以下方式获得结果:
CompletableFuture.allOf(list.toArray(new CompletableFuture[0])).join();
但不会生成补偿线程。因为 CompletableFuture.allOf
returns a CompletableFuture<Void>
,我需要再次流式传输才能得到结果。
不要让最后一个流操作中的.map(CompletableFuture::join)
骗了你,没有阻塞是因为前面的CompletableFuture::allOf
已经阻塞并等待所有任务完成