将 parallelStream 用于独立任务?
using parallelStream for independent tasks?
我有一个任务列表。每个任务彼此 独立 (它们不使用彼此的结果)。
当有 1000 个任务并使用顺序流处理这些任务时..
tasks.forEach(task->{
// long running task
task.run();
System.out.println("Thread: " + Thread.currentThread().getName());
});
..那么,第二个任务是 运行 在第一个任务之后,依此类推。循环在阻塞和顺序模式下 运行(第二个任务仅在第一个任务完成后完成)。
并行处理每个任务的最佳方式是什么?
这是最好的方法吗?
tasks.parallelStream().forEach(task->{
// long running task
task.run();
System.out.println("Thread: " + Thread.currentThread().getName());
});
根据Should I always use a parallel stream when possible?,应该避免使用并行流。就我而言,这些任务彼此 独立 ,我不需要使用 parallelStream()
带来的 同步开销 。但是,在使用 parallelStream()
时没有禁用同步开销的选项。要么?
我的用例有比 parallelStream()
更好的方法吗?
在Java8parallelStream()
中使用ForkJoinCommonPool
,它在JVM启动时初始化,包含固定数量的线程,更适合工作在"divide and conquer"范式。在您的情况下,由于它们都是孤立的,因此使用 ExecutorService
可能更合适。
一个好的解决方案是使用 CompletableFuture.allOf
。像这样使用它:
ExecutorService ex = //Whatever executor you want;
CompletableFuture.allOf((CompletableFuture<Void>[]) tasks.stream()
.map(task -> CompletableFuture.runAsync((() -> /* Do task */), ex))
.toArray());
这样做,你可以执行异步,非阻塞。此外,您将收到有关类型转换的编译器警告,但我认为在您的情况下,忽略它可能是安全的。
ExecutorService.submit
将触发任务,但是当您使用 get
获取任何结果时,它会阻塞然后检索。 CompletableFuture
获取数据时不阻塞。当您希望在所有并行任务完成后返回某种结果时,就会出现这种情况。
可以找到更多解释 here。
此外,在您最初的问题中,您询问使用 parallelStream
是否是个好主意,我对此的回答是这不是一个好主意,因为如果有任务阻塞线程然后你会遇到问题(假设你在你的代码中到处使用 parallelStream
)。
此外,CompletableFuture
可以接受它自己的线程池(您可以自定义线程池)和 运行。注意上面代码中 runAsync
的第二个参数。
如果您只是想拥有一个即发即弃的机制而不关心结果,那么使用 ExecutorService.invokeAll
是一个很好的方法。你可以这样使用它:
executorService.invokeAll(tasks.stream().map(task -> new Callable<Void>() {
@Override
public Void call() throws Exception {
// run task;
return null;
}
})
.collect(Collectors.toList()));
但在这种情况下,为什么要将 CompletableFuture
与自己的 ExecutorService
一起使用?
一个很好的理由是流畅的错误处理。你可以看到一些例子here and here
我有一个任务列表。每个任务彼此 独立 (它们不使用彼此的结果)。
当有 1000 个任务并使用顺序流处理这些任务时..
tasks.forEach(task->{
// long running task
task.run();
System.out.println("Thread: " + Thread.currentThread().getName());
});
..那么,第二个任务是 运行 在第一个任务之后,依此类推。循环在阻塞和顺序模式下 运行(第二个任务仅在第一个任务完成后完成)。
并行处理每个任务的最佳方式是什么?
这是最好的方法吗?
tasks.parallelStream().forEach(task->{
// long running task
task.run();
System.out.println("Thread: " + Thread.currentThread().getName());
});
根据Should I always use a parallel stream when possible?,应该避免使用并行流。就我而言,这些任务彼此 独立 ,我不需要使用 parallelStream()
带来的 同步开销 。但是,在使用 parallelStream()
时没有禁用同步开销的选项。要么?
我的用例有比 parallelStream()
更好的方法吗?
在Java8parallelStream()
中使用ForkJoinCommonPool
,它在JVM启动时初始化,包含固定数量的线程,更适合工作在"divide and conquer"范式。在您的情况下,由于它们都是孤立的,因此使用 ExecutorService
可能更合适。
一个好的解决方案是使用 CompletableFuture.allOf
。像这样使用它:
ExecutorService ex = //Whatever executor you want;
CompletableFuture.allOf((CompletableFuture<Void>[]) tasks.stream()
.map(task -> CompletableFuture.runAsync((() -> /* Do task */), ex))
.toArray());
这样做,你可以执行异步,非阻塞。此外,您将收到有关类型转换的编译器警告,但我认为在您的情况下,忽略它可能是安全的。
ExecutorService.submit
将触发任务,但是当您使用 get
获取任何结果时,它会阻塞然后检索。 CompletableFuture
获取数据时不阻塞。当您希望在所有并行任务完成后返回某种结果时,就会出现这种情况。
可以找到更多解释 here。
此外,在您最初的问题中,您询问使用 parallelStream
是否是个好主意,我对此的回答是这不是一个好主意,因为如果有任务阻塞线程然后你会遇到问题(假设你在你的代码中到处使用 parallelStream
)。
此外,CompletableFuture
可以接受它自己的线程池(您可以自定义线程池)和 运行。注意上面代码中 runAsync
的第二个参数。
如果您只是想拥有一个即发即弃的机制而不关心结果,那么使用 ExecutorService.invokeAll
是一个很好的方法。你可以这样使用它:
executorService.invokeAll(tasks.stream().map(task -> new Callable<Void>() {
@Override
public Void call() throws Exception {
// run task;
return null;
}
})
.collect(Collectors.toList()));
但在这种情况下,为什么要将 CompletableFuture
与自己的 ExecutorService
一起使用?
一个很好的理由是流畅的错误处理。你可以看到一些例子here and here