Java ParallelStream:多张地图或单张地图

Java ParallelStream: several map or single map

简介

我目前正在开发一个我在其中使用 Java.util.Collection.parallelStream() 的程序,想知道是否有可能让它变得更加多线程。

几张小地图

我想知道使用多个 map 是否可以让 Java.util.Collection.parallelStream() 更好地分配任务:

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
        .map(gson::toJson)
        .map(Document::parse)
        .map(InsertOneModel::new)
        .toList();

单人大地图

例如比以下更好的分布:

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
        .map(puzzle -> new InsertOneModel<>(Document.parse(gson.toJson(puzzle))))
        .toList();

问题

有没有一种解决方案更适合Java.util.Collection.parallelStream(),或者两者没有太大区别?

我查看了 Stream 源代码。映射操作的结果只是被送入下一个操作。所以一个大 map() 调用和几个小 map() 调用之间几乎没有区别。

而对于 map() 操作,并行 Stream 没有任何区别。这意味着在任何情况下,每个输入对象都将被相同的 Thread 处理直到结束。

另请注意:并行 Stream 仅在操作链允许且有足够的数据要处理时才拆分工作。因此,对于不允许随机访问的小型 CollectionCollection,并行 Stream 的行为类似于顺序 Stream.

我怀疑在性能上有很大的不同,但即使你证明它确实有更快的性能,我仍然更愿意在我必须维护的代码中看到和使用第一种风格。

第一个 multi-map 风格更容易让其他人理解,更容易维护和调试 - 例如为处理链的任何阶段添加 peek 阶段。

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
    .map(gson::toJson)
    // easy to make changes for debug, moving peek up/down
    // .peek(System.out::println)
    .map(Document::parse)
    // easy to filter:
    // .filter(this::somecondition)
    .map(InsertOneModel::new)
    .toList();

如果您的要求发生变化 - 例如需要过滤输出,或通过拆分为 2 个集合来捕获中间数据,第一种方法每次都胜过第二种。

我认为如果将它与多张地图链接起来效果不会更好。如果您的代码不是很复杂,我更愿意使用 单个大地图
要理解这一点,我们必须检查 map 函数中的代码。 link

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

如您所见,幕后发生了很多事情。创建了多个对象并调用了多个方法。因此,对于每个链式 map 函数调用,所有这些都会重复。

现在回到 ParallelStreams,他们致力于 Parallelism 的概念。
Streams Documentation


并行流是将其元素拆分为多个块的流,使用不同的线程处理每个块。因此,您可以在多核处理器的所有内核上自动划分给定操作的工作负载,并让所有内核保持同样的忙碌。

Parallel 流在内部使用默认值 ForkJoinPool,默认情况下线程数与处理器数一样多,由 Runtime.getRuntime().availableProcessors() 返回。但是您可以使用系统 属性 java.util.concurrent.ForkJoinPool.common.parallelism.

更改此池的大小

ParallelStream 在集合对象上调用 spliterator(),其中 returns 一个 Spliterator 提供拆分任务逻辑的实现。每个源或集合都有自己的拆分器实现。使用这些拆分器,并行流将任务拆分得尽可能长,最后当任务变得太小时它会按顺序执行它并合并所有子任务的部分结果。

所以我更喜欢parallelStream

  • 我一次要处理大量数据
  • 我有多个核心来处理数据
  • 现有实施的性能问题
  • 我已经没有多线程进程 运行,因为它会增加复杂性。

性能影响

  • 开销:有时,当数据集很小时,将顺序流转换为并行流会导致性能下降。管理 threads、来源和结果的开销比实际工作要昂贵。
  • 拆分Arrays 可以廉价且均匀地拆分,而 LinkedList 具有 none 这些属性。 TreeMapHashSet 拆分比 LinkedList 好,但不如数组。
  • Merging:合并操作对于某些操作来说确实很便宜,比如归约和加法,但是像分组到集合或映射这样的合并操作可能会非常昂贵。

结论:大量数据和每个元素进行的大量计算表明并行性可能是一个不错的选择。

三个步骤 (toJson/parse/new) 必须按顺序执行,因此您所做的只是比较 s.map(g.compose(f))s.map(f).map(g)。由于是 monad,Java Streams 是函子,而第二函子定律指出,本质上,s.map(g.compose(f)) == s.map(f).map(g),这意味着表达计算的两种替代方式将产生相同的结果。从性能的角度来看,两者之间的差异可能很小。

但是,一般来说您应该小心使用 Collection.parallelStream。它使用 common forkJoinPool,本质上是一个在整个 JVM 中共享的固定线程池。池的大小由主机上的内核数决定。使用公共池的问题在于同一进程中的其他线程也可能与您的代码同时使用它。这可能会导致您的代码随机且莫名其妙地变慢 - 例如,如果代码的另一部分暂时耗尽了公共线程池。

更可取的是使用 Executors 上的创建方法之一创建您自己的 ExecutorService,然后将您的任务提交给它。

private static final ExecutorService EX_SVC = Executors.newFixedThreadPool(16);

public static List<InsertOneModel<Document>> process(Stream<Puzzle> puzzles) throws InterruptedException {
    final Collection<Callable<InsertOneModel<Document>>> callables =
            puzzles.map(puzzle ->
                    (Callable<InsertOneModel<Document>>)
                            () -> new InsertOneModel<>(Document.parse(gson.toJson(puzzle)))
            ).collect(Collectors.toList());

    return EX_SVC.invokeAll(callables).stream()
            .map(fut -> {
                try {
                    return fut.get();
                } catch (ExecutionException|InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }).collect(Collectors.toList());
}