CompletableFuture:转换与组合

CompletableFuture: transformation vs. composition

请考虑 "Modern Java in Action" 书中的示例(第 2 版,清单 16.16,第 405 页)。在那里,我们有三个 map 操作来从流中的所有商店获取产品的折扣价列表。首先,我们联系每个商店以获得包含非折扣价格和折扣类型的响应,然后将响应解析为 Quote 对象,并将其传递给远程折扣服务,该服务 returns 是一个已打折价格的字符串.

public List<String> findPrices(String product) {

    List<CompletableFuture<String>> priceFutures =
        shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote ->
                    CompletableFuture.supplyAsync(
                        () -> Discount.applyDiscount(quote), executor)))
            .collect(toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(toList());
}

我的问题不是关于 thenApplythenCompose 之间的区别。我相信,后者用于避免像 CompletableFuture<CompletableFuture<...>> 这样的嵌套构造。但是,我不明白的是,为什么我们需要在这里创建另一层 CompletableFuture?似乎作者通过创建然后展平嵌套 CompletableFuture 来为代码添加了一些人为的复杂性,而不是像这样在第三个映射中简单地使用 thenApplyAsync

            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))

这两种映射用法(原始的 thenComposethenApplyAsync 的用法)是否等效?两者都接受先前映射的结果作为参数,都提供自定义执行器来执行任务,并且都 return 相同 CompletableFuture<String> 结果。

是的,thenComposesupplyAsync 实现与直接使用 thenApplyAsync 相同。

我没有读过这本书,但可能是某些示例代码侧重于某些主题或功能,而不是最简洁或最快的代码。因此,假设您正在考虑使用类似的代码,我会提出一些建议。


关于此代码的另一个建议是,通过连续调用 map 来链接每个 CompletableFuture 有点奇怪。似乎当前的示例是建立在先前的基于 Stream 的方法之上的,该方法具有多个调用,并且保持原样但使用了 CompletableFuture.

我更喜欢一个 map 并直接链接每个 CompletableFuture,这也允许将其重构为自己的方法。

所以这个:

            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))

会变成这样:

            .map(shop ->
                CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)
                .thenApply(Quote::parse)
                .thenApplyAsync(Discount::applyDiscount, executor))

这个 lambda 很容易变成一个方法,在没有 Stream 的情况下可以重用,它可以与另一个 CompletableFuture 组合,可以测试,可以模拟,等等


另一个建议是让你的代码一直异步,这样findPrices就不会在join(或get,就此而言)。

阻塞的问题在于它可能会阻塞执行程序上的最后一个可用线程,从而因线程耗尽而引发死锁。您的代码所依赖的异步代码最终需要 运行 在执行程序上,可能永远不会 运行.

public CompletableFuture<List<String>> findPricesAsync(String product) {
    // List<CompletableFuture<String>> priceFutures = ...

    CompletableFuture<Void> all = CompletableFuture.allOf(priceFutures.toArray(new CompletableFuture<String>[priceFutures.size()]));
    return all.thenRun(() -> priceFutures.stream()
        .map(CompletableFuture::join));
}

请注意,return 类型从 List<String> 更改为 CompletableFuture<List<String>>。另请注意,最后一次调用 join 不会阻塞,因为调用它的每个 CompletableFuture 都已完成。


最后,我倾向于 return CompletionStage,因为它允许 CompletableFuture 以外的假设实现。我还假设 returned 对象也实现了 Future,它允许对结果使用 get,但不能使用 join,区别在于声明的抛出异常类型。

在一个案例中,我为异步 return CompletionStage 制作了类似 NIO 的方法 I/O,我实现了 CompletableFuture 的子类来覆盖使用的默认执行程序在每个没有 executor 参数的 *Async 方法中。自 Java 9 以来,这变得更容易了,仍然是通过子类化,但它只需要覆盖 defaultExecutor。我进行子类化的主要原因是使用组合的替代方法会导致 代码(包装结果等等)。另一个原因,但不是真正让我担心的,是每个实例都有一个额外的对象被垃圾收集。

这只是为了证明可能存在实际需要自定义 CompletionStage 实现的情况,它可能是也可能不是 CompletableFuture.

的子类