从 Recursive CompletableFuture 获取所有结果

Get all results from Recursive CompletableFuture

场景是这样的:它可能会随机生成一些数据,如果随机生成,则需要递归检索数据,最后我需要获取所有生成的数据。

interface DataProvider {
  List<String> randomData(String url);
}

public static void main(String[] args) {
    List<String> strings = fetch(Executors.newFixedThreadPool(4), new DataProvider() {
        final Random random = new Random();
        @Override
        public List<String> randomData(String url) {
            if (random.nextBoolean()) {
                System.out.println("provide some data");
                return List.of(UUID.randomUUID().toString());
            }
            return null;
        }
    }, List.of("a", "b", "c"));
    System.out.println("results are: ");
    System.out.println(strings);
}

private static List<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
    if (items == null || items.isEmpty())
        return new ArrayList<>();
    List<CompletableFuture<List<String>>> collect =
            items.stream()
                    .map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
                    .collect(Collectors.toList());
    List<CompletableFuture<List<String>>> list = new ArrayList<>();
    collect.forEach(item -> {
        CompletableFuture<List<String>> listCompletableFuture = item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es);
        list.add(listCompletableFuture);
    });
    return list.stream().flatMap(item -> item.join().stream()).collect(Collectors.toList());
}

有时程序会死机,有时会打印一个空集合。 (打印provide some data)。

我哪里错了?我对 CompletableFuture 一点都不熟悉,所以可能整个递归调用是错误的。 (或者代码可以更简单,因为CompletableFuture有很多方法)。

您的代码确实存在一些问题:

逻辑问题:结果只能是一个空列表!

fetch() 的结果将是:

  • 如果 items 为空,则为空列表
  • items 列表中的每个项目递归应用 fetch(randomData(item)) 的结果 由于没有 return 非空列表的递归叶,并且父调用也不向列表添加任何内容,因此它只能 return 一个空结果。

也许您还想在结果中包含 randomData()

技术问题:在固定线程池中使用 CompletableFuture.join()

您正在使用 Executors.newFixedThreadPool(4)。顾名思义,线程的数量是固定的,所以当所有线程都用完后,新的任务就会排队。

问题是您正在使用 join() 等待其中一些新任务。所以一方面你阻塞了一个线程,另一方面你在等待你刚刚放入队列的东西。

由于是递归的,如果达到4的深度就会死锁

防止这种情况的最简单方法是使用 ForkJoinPool。当其中一个线程被阻塞时,这种池将生成新线程。

附带说明,ForkJoinPool 使用守护线程,因此它们不会阻止 JVM 终止。如果您想使用固定线程池,您必须对其调用 shutdown() 以确保它允许终止 - 或者将其配置为使用守护线程。

其他可能的改进

您的代码正在构建大量中间集合并阻塞大量 join() 调用。由于 CompletionStage 的实现,CompletableFuture 旨在链接相关的计算阶段,而不仅仅是将其用作可以完成的 Future

在不改变您当前逻辑的情况下,首先您可以使 fetch() return 成为 Stream,这会删除很多 collect()/stream() 调用:

private static Stream<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
    if (items == null || items.isEmpty())
        return Stream.empty();
    List<CompletableFuture<Stream<String>>> list =
            items.stream()
                    .map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
                    .map(item -> item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es))
                    .collect(Collectors.toList());
    return list.stream().flatMap(CompletableFuture::join);
}

为了达到这个目的,改变了以下内容:

  • forEach() / list.add() 替换为 collect.stream(…).map(…).collect(toList())
  • 内联 collect 变量
  • 删除结果 .collect(toList()).stream(),因为它是多余的
  • 将return类型更改为Stream<String>,主要变化是: ** 将 list 的类型更改为 List<CompletableFuture<Stream<String>>>(因为内部 Stream 来自第二个 map(… -> fetch()) 调用 ** 从 return 语句
  • 中删除 collect()stream()

请注意,您需要保留中间 list 以确保在第一次调用 CompletableFuture::join 之前提供所有任务。

这样更好,但调用仍然是同步的并且需要 ForkJoinPool 才能工作。既然fetch()里面的逻辑大部分是异步的,为什么不让整个方法异步呢?

这主要是需要把return的类型改成CompletableFuture<Stream<String>>,然后用allOf()创建一个future,当所有的中间future都完成的时候才会完成:

private static CompletableFuture<Stream<String>> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
    if (items == null || items.isEmpty())
        return CompletableFuture.completedFuture(Stream.empty());
    List<CompletableFuture<Stream<String>>> list =
            items.stream()
                    .map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
                    .map(item -> item.thenComposeAsync(strings -> fetch(es, dataProvider, strings), es))
                    .collect(Collectors.toList());
    return CompletableFuture.allOf(list.toArray(new CompletableFuture[0]))
            .thenApply(dummy -> list.stream().flatMap(CompletableFuture::join));
}

现在这是 100% 异步的,因为 join() 只会在已知已完成的 future 上调用(感谢 allOf()),因此永远不会阻塞。

它不再需要 ForkJoinPool。事实上,它甚至可以 运行 上一个 newFixedThreadPool(1)!好吧,只要你在最后调用 shutdown() 就可以了……