CompletableFuture 单个任务继续执行许多并行任务
CompletableFuture single task that continues with many parallel tasks
我有以下代码:
return CompletableFuture.supplyAsync(() -> {
return foo; // some custom object
})
.thenAccept(foo -> {
// ??? need to spawn N async parallel jobs that works on 'foo'
});
英文:第一个任务异步创建foo
对象;然后我需要 运行 N 个并行进程。
那么有没有更好的方法:
...
CompletableFuture[] parallel = new CompletableFuture[N];
for (int i = 0; i < N; i++) {
parallel[i] = CompletableFuture.runAsync(() -> {
work(foo);
});
}
CompletableFuture.allOf(parallel).join();
...
我不喜欢这样,因为一个线程在等待 N 个作业完成时被锁定。
因为 CompletableFuture.allOf
已经 returns 另一个 CompletableFuture<Void>
a 你可以再做一个 .thenAccept
并从 parallel
中的 CF 中提取返回值在回调中,这样你就可以避免调用 join
您可以在特定的先决条件作业上链接任意数量的独立作业,例如
CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
Collections.nCopies(N, base).forEach(f -> f.thenAcceptAsync(foo -> work(foo)));
在提供 Foo
实例的初始作业完成后, 将生成 N
并行作业,并发调用 work(foo)
。
但请记住,底层框架将考虑可用 CPU 内核的数量来确定实际执行并行作业的线程池的大小,因此如果 N > #cores
,其中一些作业可能运行一个接着一个
如果工作是 I/O 绑定的,因此,你想要有更多的并行线程,你必须指定你自己的执行器。
nCopies
/forEach
不是必需的,for
循环也可以,但它提供了如何处理后续作业的提示,这取决于完成情况在所有这些并行作业中:
CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
CompletableFuture<Void> all = CompletableFuture.allOf(
Collections.nCopies(N, base).stream()
.map(f -> f.thenAcceptAsync(foo -> work(foo)))
.toArray(CompletableFuture<?>[]::new));
现在您可以使用 all
检查所有作业的完成情况或链式附加操作。
我有以下代码:
return CompletableFuture.supplyAsync(() -> {
return foo; // some custom object
})
.thenAccept(foo -> {
// ??? need to spawn N async parallel jobs that works on 'foo'
});
英文:第一个任务异步创建foo
对象;然后我需要 运行 N 个并行进程。
那么有没有更好的方法:
...
CompletableFuture[] parallel = new CompletableFuture[N];
for (int i = 0; i < N; i++) {
parallel[i] = CompletableFuture.runAsync(() -> {
work(foo);
});
}
CompletableFuture.allOf(parallel).join();
...
我不喜欢这样,因为一个线程在等待 N 个作业完成时被锁定。
因为 CompletableFuture.allOf
已经 returns 另一个 CompletableFuture<Void>
a 你可以再做一个 .thenAccept
并从 parallel
中的 CF 中提取返回值在回调中,这样你就可以避免调用 join
您可以在特定的先决条件作业上链接任意数量的独立作业,例如
CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
Collections.nCopies(N, base).forEach(f -> f.thenAcceptAsync(foo -> work(foo)));
在提供 Foo
实例的初始作业完成后, 将生成 N
并行作业,并发调用 work(foo)
。
但请记住,底层框架将考虑可用 CPU 内核的数量来确定实际执行并行作业的线程池的大小,因此如果 N > #cores
,其中一些作业可能运行一个接着一个
如果工作是 I/O 绑定的,因此,你想要有更多的并行线程,你必须指定你自己的执行器。
nCopies
/forEach
不是必需的,for
循环也可以,但它提供了如何处理后续作业的提示,这取决于完成情况在所有这些并行作业中:
CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
CompletableFuture<Void> all = CompletableFuture.allOf(
Collections.nCopies(N, base).stream()
.map(f -> f.thenAcceptAsync(foo -> work(foo)))
.toArray(CompletableFuture<?>[]::new));
现在您可以使用 all
检查所有作业的完成情况或链式附加操作。