如何使用 CompletableFuture 运行 并行调用多个服务?
How to run multiple service calls in parallel using CompletableFuture?
我正在return关注用户的回复
class FinalResponseDTO {
List<Service1ResponseDTO> service1ResponseDTO;
Long totalCount;
List<Service2ResponseDTO> service2ResponseDTO;
}
截至目前,我正在进行三个顺序调用来计算此 FinalResponseDTO
,每个调用都可以 运行 独立于其他调用。我尝试制作三个不同的 CompletableFuture
作为:
CompletableFuture<List<Service1ResponseDTO> future1 = CompletableFuture.supplyAsync(() -> service1.callMethod1());
CompletableFuture<Long> future2 = CompletableFuture.supplyAsync(() -> service2.callMethod2());
CompletableFuture<Service2ResponseDTO> future3 = CompletableFuture.supplyAsync(() -> service3.callMethod3());
如果我这样做 CompletableFuture.allOf(future1, future2, future3).join();
还是我应该打电话给 CompletableFuture.allOf(future1, future2, future3).get();
?即使我调用这些 join
或 get
中的任何一个,那么我应该如何从中构造 FinalResponseDTO
。我是 Java 8 种并发特性的新手,例如 CompletableFuture
我很困惑,因为每个 return 未来的类型都是不同的,我应该如何获得所有这些的组合响应期货然后构建我的最终输出?
来自 CompletableFuture.allOf()
的 Javadoc:
Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete. If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture also does so, with a CompletionException holding this exception as its cause. Otherwise, the results, if any, of the given CompletableFutures are not reflected in the returned CompletableFuture, but may be obtained by inspecting them individually.
因此,当组合 CompletableFuture 完成时,您可以通过应用构造对象的函数来检查值并使用简单的构造函数构造最终响应对象:
CompletableFuture.allOf(future1, future2, future3).thenApply(v ->
new FinalResponseDTO(future1.join(), future2.join(), future3.join())
);
CompletableFuture<List<Service1ResponseDTO> future1 =
CompletableFuture.supplyAsync(() -> service1.callMethod1());
CompletableFuture<Long> future2 =
CompletableFuture.supplyAsync(() -> service2.callMethod2());
CompletableFuture<List<Service2ResponseDTO>> future3 =
CompletableFuture.supplyAsync(() -> service3.callMethod3());
CompletableFuture.allOf(future1, future2, future3).get();
return new FinalResponseDTO(future1.join(), future2.join(), future3.join());
注意:supplyAsync
在 ForkJoinPool.commonPool()
上运行,因此提供您自己的 执行器 是一个不错的选择,例如Executors.newCachedThreadPool()
:
CompletableFuture.supplyAsync(() -> action, executor);
而不是使用 CompletableFuture.allOf
,您可以回退到 CompletableFuture
的有点隐藏的 Applicative:
static <T, R> CompletableFuture<R> alsoApply(CompletableFuture<T> future, CompletableFuture<Function<T, R>> f) {
return f.thenCompose(future::thenApply);
}
使用这个辅助函数,您可以在并行线程中执行 futures:
CompletableFuture<String> future = alsoApply(
CompletableFuture.supplyAsync(() -> "a"),
CompletableFuture.supplyAsync(() -> "b")
.thenApply(b -> a -> a + b));
assertEquals("ab", future.get());
请参阅 了解它的来源、原因和工作原理。
我正在return关注用户的回复
class FinalResponseDTO {
List<Service1ResponseDTO> service1ResponseDTO;
Long totalCount;
List<Service2ResponseDTO> service2ResponseDTO;
}
截至目前,我正在进行三个顺序调用来计算此 FinalResponseDTO
,每个调用都可以 运行 独立于其他调用。我尝试制作三个不同的 CompletableFuture
作为:
CompletableFuture<List<Service1ResponseDTO> future1 = CompletableFuture.supplyAsync(() -> service1.callMethod1());
CompletableFuture<Long> future2 = CompletableFuture.supplyAsync(() -> service2.callMethod2());
CompletableFuture<Service2ResponseDTO> future3 = CompletableFuture.supplyAsync(() -> service3.callMethod3());
如果我这样做 CompletableFuture.allOf(future1, future2, future3).join();
还是我应该打电话给 CompletableFuture.allOf(future1, future2, future3).get();
?即使我调用这些 join
或 get
中的任何一个,那么我应该如何从中构造 FinalResponseDTO
。我是 Java 8 种并发特性的新手,例如 CompletableFuture
我很困惑,因为每个 return 未来的类型都是不同的,我应该如何获得所有这些的组合响应期货然后构建我的最终输出?
来自 CompletableFuture.allOf()
的 Javadoc:
Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete. If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture also does so, with a CompletionException holding this exception as its cause. Otherwise, the results, if any, of the given CompletableFutures are not reflected in the returned CompletableFuture, but may be obtained by inspecting them individually.
因此,当组合 CompletableFuture 完成时,您可以通过应用构造对象的函数来检查值并使用简单的构造函数构造最终响应对象:
CompletableFuture.allOf(future1, future2, future3).thenApply(v ->
new FinalResponseDTO(future1.join(), future2.join(), future3.join())
);
CompletableFuture<List<Service1ResponseDTO> future1 =
CompletableFuture.supplyAsync(() -> service1.callMethod1());
CompletableFuture<Long> future2 =
CompletableFuture.supplyAsync(() -> service2.callMethod2());
CompletableFuture<List<Service2ResponseDTO>> future3 =
CompletableFuture.supplyAsync(() -> service3.callMethod3());
CompletableFuture.allOf(future1, future2, future3).get();
return new FinalResponseDTO(future1.join(), future2.join(), future3.join());
注意:supplyAsync
在 ForkJoinPool.commonPool()
上运行,因此提供您自己的 执行器 是一个不错的选择,例如Executors.newCachedThreadPool()
:
CompletableFuture.supplyAsync(() -> action, executor);
而不是使用 CompletableFuture.allOf
,您可以回退到 CompletableFuture
的有点隐藏的 Applicative:
static <T, R> CompletableFuture<R> alsoApply(CompletableFuture<T> future, CompletableFuture<Function<T, R>> f) {
return f.thenCompose(future::thenApply);
}
使用这个辅助函数,您可以在并行线程中执行 futures:
CompletableFuture<String> future = alsoApply(
CompletableFuture.supplyAsync(() -> "a"),
CompletableFuture.supplyAsync(() -> "b")
.thenApply(b -> a -> a + b));
assertEquals("ab", future.get());
请参阅