CompletableFuture:转换与组合
CompletableFuture: transformation vs. composition
请考虑 "Modern Java in Action" 书中的示例(第 2 版,清单 16.16,第 405 页)。在那里,我们有三个 map 操作来从流中的所有商店获取产品的折扣价列表。首先,我们联系每个商店以获得包含非折扣价格和折扣类型的响应,然后将响应解析为 Quote 对象,并将其传递给远程折扣服务,该服务 returns 是一个已打折价格的字符串.
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
我的问题不是关于 thenApply
和 thenCompose
之间的区别。我相信,后者用于避免像 CompletableFuture<CompletableFuture<...>>
这样的嵌套构造。但是,我不明白的是,为什么我们需要在这里创建另一层 CompletableFuture
?似乎作者通过创建然后展平嵌套 CompletableFuture
来为代码添加了一些人为的复杂性,而不是像这样在第三个映射中简单地使用 thenApplyAsync
:
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))
这两种映射用法(原始的 thenCompose
和 thenApplyAsync
的用法)是否等效?两者都接受先前映射的结果作为参数,都提供自定义执行器来执行任务,并且都 return 相同 CompletableFuture<String>
结果。
是的,thenCompose
和 supplyAsync
实现与直接使用 thenApplyAsync
相同。
我没有读过这本书,但可能是某些示例代码侧重于某些主题或功能,而不是最简洁或最快的代码。因此,假设您正在考虑使用类似的代码,我会提出一些建议。
关于此代码的另一个建议是,通过连续调用 map
来链接每个 CompletableFuture
有点奇怪。似乎当前的示例是建立在先前的基于 Stream
的方法之上的,该方法具有多个调用,并且保持原样但使用了 CompletableFuture
.
我更喜欢一个 map
并直接链接每个 CompletableFuture
,这也允许将其重构为自己的方法。
所以这个:
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))
会变成这样:
.map(shop ->
CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)
.thenApply(Quote::parse)
.thenApplyAsync(Discount::applyDiscount, executor))
这个 lambda 很容易变成一个方法,在没有 Stream
的情况下可以重用,它可以与另一个 CompletableFuture
组合,可以测试,可以模拟,等等
另一个建议是让你的代码一直异步,这样findPrices
就不会在join
(或get
,就此而言)。
阻塞的问题在于它可能会阻塞执行程序上的最后一个可用线程,从而因线程耗尽而引发死锁。您的代码所依赖的异步代码最终需要 运行 在执行程序上,可能永远不会 运行.
public CompletableFuture<List<String>> findPricesAsync(String product) {
// List<CompletableFuture<String>> priceFutures = ...
CompletableFuture<Void> all = CompletableFuture.allOf(priceFutures.toArray(new CompletableFuture<String>[priceFutures.size()]));
return all.thenRun(() -> priceFutures.stream()
.map(CompletableFuture::join));
}
请注意,return 类型从 List<String>
更改为 CompletableFuture<List<String>>
。另请注意,最后一次调用 join
不会阻塞,因为调用它的每个 CompletableFuture
都已完成。
最后,我倾向于 return CompletionStage
,因为它允许 CompletableFuture
以外的假设实现。我还假设 returned 对象也实现了 Future
,它允许对结果使用 get
,但不能使用 join
,区别在于声明的抛出异常类型。
在一个案例中,我为异步 return CompletionStage
制作了类似 NIO 的方法 I/O,我实现了 CompletableFuture
的子类来覆盖使用的默认执行程序在每个没有 executor 参数的 *Async
方法中。自 Java 9 以来,这变得更容易了,仍然是通过子类化,但它只需要覆盖 defaultExecutor
。我进行子类化的主要原因是使用组合的替代方法会导致 多 代码(包装结果等等)。另一个原因,但不是真正让我担心的,是每个实例都有一个额外的对象被垃圾收集。
这只是为了证明可能存在实际需要自定义 CompletionStage
实现的情况,它可能是也可能不是 CompletableFuture
.
的子类
请考虑 "Modern Java in Action" 书中的示例(第 2 版,清单 16.16,第 405 页)。在那里,我们有三个 map 操作来从流中的所有商店获取产品的折扣价列表。首先,我们联系每个商店以获得包含非折扣价格和折扣类型的响应,然后将响应解析为 Quote 对象,并将其传递给远程折扣服务,该服务 returns 是一个已打折价格的字符串.
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
我的问题不是关于 thenApply
和 thenCompose
之间的区别。我相信,后者用于避免像 CompletableFuture<CompletableFuture<...>>
这样的嵌套构造。但是,我不明白的是,为什么我们需要在这里创建另一层 CompletableFuture
?似乎作者通过创建然后展平嵌套 CompletableFuture
来为代码添加了一些人为的复杂性,而不是像这样在第三个映射中简单地使用 thenApplyAsync
:
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))
这两种映射用法(原始的 thenCompose
和 thenApplyAsync
的用法)是否等效?两者都接受先前映射的结果作为参数,都提供自定义执行器来执行任务,并且都 return 相同 CompletableFuture<String>
结果。
是的,thenCompose
和 supplyAsync
实现与直接使用 thenApplyAsync
相同。
我没有读过这本书,但可能是某些示例代码侧重于某些主题或功能,而不是最简洁或最快的代码。因此,假设您正在考虑使用类似的代码,我会提出一些建议。
关于此代码的另一个建议是,通过连续调用 map
来链接每个 CompletableFuture
有点奇怪。似乎当前的示例是建立在先前的基于 Stream
的方法之上的,该方法具有多个调用,并且保持原样但使用了 CompletableFuture
.
我更喜欢一个 map
并直接链接每个 CompletableFuture
,这也允许将其重构为自己的方法。
所以这个:
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))
会变成这样:
.map(shop ->
CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)
.thenApply(Quote::parse)
.thenApplyAsync(Discount::applyDiscount, executor))
这个 lambda 很容易变成一个方法,在没有 Stream
的情况下可以重用,它可以与另一个 CompletableFuture
组合,可以测试,可以模拟,等等
另一个建议是让你的代码一直异步,这样findPrices
就不会在join
(或get
,就此而言)。
阻塞的问题在于它可能会阻塞执行程序上的最后一个可用线程,从而因线程耗尽而引发死锁。您的代码所依赖的异步代码最终需要 运行 在执行程序上,可能永远不会 运行.
public CompletableFuture<List<String>> findPricesAsync(String product) {
// List<CompletableFuture<String>> priceFutures = ...
CompletableFuture<Void> all = CompletableFuture.allOf(priceFutures.toArray(new CompletableFuture<String>[priceFutures.size()]));
return all.thenRun(() -> priceFutures.stream()
.map(CompletableFuture::join));
}
请注意,return 类型从 List<String>
更改为 CompletableFuture<List<String>>
。另请注意,最后一次调用 join
不会阻塞,因为调用它的每个 CompletableFuture
都已完成。
最后,我倾向于 return CompletionStage
,因为它允许 CompletableFuture
以外的假设实现。我还假设 returned 对象也实现了 Future
,它允许对结果使用 get
,但不能使用 join
,区别在于声明的抛出异常类型。
在一个案例中,我为异步 return CompletionStage
制作了类似 NIO 的方法 I/O,我实现了 CompletableFuture
的子类来覆盖使用的默认执行程序在每个没有 executor 参数的 *Async
方法中。自 Java 9 以来,这变得更容易了,仍然是通过子类化,但它只需要覆盖 defaultExecutor
。我进行子类化的主要原因是使用组合的替代方法会导致 多 代码(包装结果等等)。另一个原因,但不是真正让我担心的,是每个实例都有一个额外的对象被垃圾收集。
这只是为了证明可能存在实际需要自定义 CompletionStage
实现的情况,它可能是也可能不是 CompletableFuture
.