从 Recursive CompletableFuture 获取所有结果
Get all results from Recursive CompletableFuture
场景是这样的:它可能会随机生成一些数据,如果随机生成,则需要递归检索数据,最后我需要获取所有生成的数据。
interface DataProvider {
List<String> randomData(String url);
}
public static void main(String[] args) {
List<String> strings = fetch(Executors.newFixedThreadPool(4), new DataProvider() {
final Random random = new Random();
@Override
public List<String> randomData(String url) {
if (random.nextBoolean()) {
System.out.println("provide some data");
return List.of(UUID.randomUUID().toString());
}
return null;
}
}, List.of("a", "b", "c"));
System.out.println("results are: ");
System.out.println(strings);
}
private static List<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return new ArrayList<>();
List<CompletableFuture<List<String>>> collect =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.collect(Collectors.toList());
List<CompletableFuture<List<String>>> list = new ArrayList<>();
collect.forEach(item -> {
CompletableFuture<List<String>> listCompletableFuture = item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es);
list.add(listCompletableFuture);
});
return list.stream().flatMap(item -> item.join().stream()).collect(Collectors.toList());
}
有时程序会死机,有时会打印一个空集合。 (打印provide some data
)。
我哪里错了?我对 CompletableFuture
一点都不熟悉,所以可能整个递归调用是错误的。 (或者代码可以更简单,因为CompletableFuture
有很多方法)。
您的代码确实存在一些问题:
逻辑问题:结果只能是一个空列表!
fetch()
的结果将是:
- 如果
items
为空,则为空列表
- 对
items
列表中的每个项目递归应用 fetch(randomData(item))
的结果
由于没有 return 非空列表的递归叶,并且父调用也不向列表添加任何内容,因此它只能 return 一个空结果。
也许您还想在结果中包含 randomData()
?
技术问题:在固定线程池中使用 CompletableFuture.join()
您正在使用 Executors.newFixedThreadPool(4)
。顾名思义,线程的数量是固定的,所以当所有线程都用完后,新的任务就会排队。
问题是您正在使用 join()
等待其中一些新任务。所以一方面你阻塞了一个线程,另一方面你在等待你刚刚放入队列的东西。
由于是递归的,如果达到4的深度就会死锁
防止这种情况的最简单方法是使用 ForkJoinPool
。当其中一个线程被阻塞时,这种池将生成新线程。
附带说明,ForkJoinPool
使用守护线程,因此它们不会阻止 JVM 终止。如果您想使用固定线程池,您必须对其调用 shutdown()
以确保它允许终止 - 或者将其配置为使用守护线程。
其他可能的改进
您的代码正在构建大量中间集合并阻塞大量 join()
调用。由于 CompletionStage
的实现,CompletableFuture
旨在链接相关的计算阶段,而不仅仅是将其用作可以完成的 Future
。
在不改变您当前逻辑的情况下,首先您可以使 fetch()
return 成为 Stream
,这会删除很多 collect()
/stream()
调用:
private static Stream<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return Stream.empty();
List<CompletableFuture<Stream<String>>> list =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.map(item -> item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es))
.collect(Collectors.toList());
return list.stream().flatMap(CompletableFuture::join);
}
为了达到这个目的,改变了以下内容:
- 将
forEach()
/ list.add()
替换为 collect.stream(…).map(…).collect(toList())
- 内联
collect
变量
- 删除结果
.collect(toList()).stream()
,因为它是多余的
- 将return类型更改为
Stream<String>
,主要变化是:
** 将 list
的类型更改为 List<CompletableFuture<Stream<String>>>
(因为内部 Stream
来自第二个 map(… -> fetch())
调用
** 从 return 语句 中删除 collect()
和 stream()
请注意,您需要保留中间 list
以确保在第一次调用 CompletableFuture::join
之前提供所有任务。
这样更好,但调用仍然是同步的并且需要 ForkJoinPool
才能工作。既然fetch()
里面的逻辑大部分是异步的,为什么不让整个方法异步呢?
这主要是需要把return的类型改成CompletableFuture<Stream<String>>
,然后用allOf()
创建一个future,当所有的中间future都完成的时候才会完成:
private static CompletableFuture<Stream<String>> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return CompletableFuture.completedFuture(Stream.empty());
List<CompletableFuture<Stream<String>>> list =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.map(item -> item.thenComposeAsync(strings -> fetch(es, dataProvider, strings), es))
.collect(Collectors.toList());
return CompletableFuture.allOf(list.toArray(new CompletableFuture[0]))
.thenApply(dummy -> list.stream().flatMap(CompletableFuture::join));
}
现在这是 100% 异步的,因为 join()
只会在已知已完成的 future 上调用(感谢 allOf()
),因此永远不会阻塞。
它不再需要 ForkJoinPool
。事实上,它甚至可以 运行 上一个 newFixedThreadPool(1)
!好吧,只要你在最后调用 shutdown()
就可以了……
场景是这样的:它可能会随机生成一些数据,如果随机生成,则需要递归检索数据,最后我需要获取所有生成的数据。
interface DataProvider {
List<String> randomData(String url);
}
public static void main(String[] args) {
List<String> strings = fetch(Executors.newFixedThreadPool(4), new DataProvider() {
final Random random = new Random();
@Override
public List<String> randomData(String url) {
if (random.nextBoolean()) {
System.out.println("provide some data");
return List.of(UUID.randomUUID().toString());
}
return null;
}
}, List.of("a", "b", "c"));
System.out.println("results are: ");
System.out.println(strings);
}
private static List<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return new ArrayList<>();
List<CompletableFuture<List<String>>> collect =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.collect(Collectors.toList());
List<CompletableFuture<List<String>>> list = new ArrayList<>();
collect.forEach(item -> {
CompletableFuture<List<String>> listCompletableFuture = item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es);
list.add(listCompletableFuture);
});
return list.stream().flatMap(item -> item.join().stream()).collect(Collectors.toList());
}
有时程序会死机,有时会打印一个空集合。 (打印provide some data
)。
我哪里错了?我对 CompletableFuture
一点都不熟悉,所以可能整个递归调用是错误的。 (或者代码可以更简单,因为CompletableFuture
有很多方法)。
您的代码确实存在一些问题:
逻辑问题:结果只能是一个空列表!
fetch()
的结果将是:
- 如果
items
为空,则为空列表 - 对
items
列表中的每个项目递归应用fetch(randomData(item))
的结果 由于没有 return 非空列表的递归叶,并且父调用也不向列表添加任何内容,因此它只能 return 一个空结果。
也许您还想在结果中包含 randomData()
?
技术问题:在固定线程池中使用 CompletableFuture.join()
您正在使用 Executors.newFixedThreadPool(4)
。顾名思义,线程的数量是固定的,所以当所有线程都用完后,新的任务就会排队。
问题是您正在使用 join()
等待其中一些新任务。所以一方面你阻塞了一个线程,另一方面你在等待你刚刚放入队列的东西。
由于是递归的,如果达到4的深度就会死锁
防止这种情况的最简单方法是使用 ForkJoinPool
。当其中一个线程被阻塞时,这种池将生成新线程。
附带说明,ForkJoinPool
使用守护线程,因此它们不会阻止 JVM 终止。如果您想使用固定线程池,您必须对其调用 shutdown()
以确保它允许终止 - 或者将其配置为使用守护线程。
其他可能的改进
您的代码正在构建大量中间集合并阻塞大量 join()
调用。由于 CompletionStage
的实现,CompletableFuture
旨在链接相关的计算阶段,而不仅仅是将其用作可以完成的 Future
。
在不改变您当前逻辑的情况下,首先您可以使 fetch()
return 成为 Stream
,这会删除很多 collect()
/stream()
调用:
private static Stream<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return Stream.empty();
List<CompletableFuture<Stream<String>>> list =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.map(item -> item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es))
.collect(Collectors.toList());
return list.stream().flatMap(CompletableFuture::join);
}
为了达到这个目的,改变了以下内容:
- 将
forEach()
/list.add()
替换为collect.stream(…).map(…).collect(toList())
- 内联
collect
变量 - 删除结果
.collect(toList()).stream()
,因为它是多余的 - 将return类型更改为
Stream<String>
,主要变化是: ** 将list
的类型更改为List<CompletableFuture<Stream<String>>>
(因为内部Stream
来自第二个map(… -> fetch())
调用 ** 从 return 语句 中删除
collect()
和 stream()
请注意,您需要保留中间 list
以确保在第一次调用 CompletableFuture::join
之前提供所有任务。
这样更好,但调用仍然是同步的并且需要 ForkJoinPool
才能工作。既然fetch()
里面的逻辑大部分是异步的,为什么不让整个方法异步呢?
这主要是需要把return的类型改成CompletableFuture<Stream<String>>
,然后用allOf()
创建一个future,当所有的中间future都完成的时候才会完成:
private static CompletableFuture<Stream<String>> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return CompletableFuture.completedFuture(Stream.empty());
List<CompletableFuture<Stream<String>>> list =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.map(item -> item.thenComposeAsync(strings -> fetch(es, dataProvider, strings), es))
.collect(Collectors.toList());
return CompletableFuture.allOf(list.toArray(new CompletableFuture[0]))
.thenApply(dummy -> list.stream().flatMap(CompletableFuture::join));
}
现在这是 100% 异步的,因为 join()
只会在已知已完成的 future 上调用(感谢 allOf()
),因此永远不会阻塞。
它不再需要 ForkJoinPool
。事实上,它甚至可以 运行 上一个 newFixedThreadPool(1)
!好吧,只要你在最后调用 shutdown()
就可以了……