将 parallelStream 用于独立任务?

using parallelStream for independent tasks?

我有一个任务列表。每个任务彼此 独立 (它们不使用彼此的结果)。

当有 1000 个任务并使用顺序流处理这些任务时..

tasks.forEach(task->{
            // long running task
            task.run();
            System.out.println("Thread: " + Thread.currentThread().getName());
        });

..那么,第二个任务是 运行 在第一个任务之后,依此类推。循环在阻塞和顺序模式下 运行(第二个任务仅在第一个任务完成后完成)。

并行处理每个任务的最佳方式是什么?

这是最好的方法吗?

tasks.parallelStream().forEach(task->{
            // long running task
            task.run();
            System.out.println("Thread: " + Thread.currentThread().getName());
        });

根据Should I always use a parallel stream when possible?,应该避免使用并行流。就我而言,这些任务彼此 独立 ,我不需要使用 parallelStream() 带来的 同步开销 。但是,在使用 parallelStream() 时没有禁用同步开销的选项。要么?

我的用例有比 parallelStream() 更好的方法吗?

在Java8parallelStream()中使用ForkJoinCommonPool,它在JVM启动时初始化,包含固定数量的线程,更适合工作在"divide and conquer"范式。在您的情况下,由于它们都是孤立的,因此使用 ExecutorService 可能更合适。

一个好的解决方案是使用 CompletableFuture.allOf。像这样使用它:

ExecutorService ex = //Whatever executor you want;

CompletableFuture.allOf((CompletableFuture<Void>[]) tasks.stream()
        .map(task -> CompletableFuture.runAsync((() -> /* Do task */), ex))
        .toArray());

这样做,你可以执行异步,非阻塞。此外,您将收到有关类型转换的编译器警告,但我认为在您的情况下,忽略它可能是安全的。

ExecutorService.submit 将触发任务,但是当您使用 get 获取任何结果时,它会阻塞然后检索。 CompletableFuture 获取数据时不阻塞。当您希望在所有并行任务完成后返回某种结果时,就会出现这种情况。
可以找到更多解释 here

此外,在您最初的问题中,您询问使用 parallelStream 是否是个好主意,我对此的回答是这不是一个好主意,因为如果有任务阻塞线程然后你会遇到问题(假设你在你的代码中到处使用 parallelStream )。

此外,CompletableFuture 可以接受它自己的线程池(您可以自定义线程池)和 运行。注意上面代码中 runAsync 的第二个参数。

如果您只是想拥有一个即发即弃的机制而不关心结果,那么使用 ExecutorService.invokeAll 是一个很好的方法。你可以这样使用它:

 executorService.invokeAll(tasks.stream().map(task -> new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    // run task;
                    return null;
                }
            })
.collect(Collectors.toList()));  

但在这种情况下,为什么要将 CompletableFuture 与自己的 ExecutorService 一起使用?
一个很好的理由是流畅的错误处理。你可以看到一些例子here and here