并行无限 Java 流 运行 内存不足

Parallel Infinite Java Streams run out of Memory

我试图理解为什么下面的 Java 程序给出了 OutOfMemoryError,而没有 .parallel() 的相应程序却没有。

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

我有两个问题:

  1. 这个程序的预期输出是什么?

    没有 .parallel() 似乎这只是输出 sum(1+2+3+...) 这意味着它只是 "gets stuck" 在 flatMap 中的第一个流,这是有道理的。

    并行我不知道是否有预期的行为,但我的猜测是它以某种方式交错了第一个 n 左右的流,其中 n 是并行的数量工作人员。根据 chunking/buffering 行为,它也可能略有不同。

  2. 什么原因导致 运行 内存不足? 我特别想了解这些流是如何在幕后实现的。

    我猜有些东西阻塞了流,所以它永远不会完成并且能够摆脱生成的值,但我不太清楚事物的评估顺序和缓冲发生的位置。

编辑: 如果相关,我使用 Java 11.

Editt 2: 显然即使是简单的程序 IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum() 也会发生同样的事情,所以这可能与 limit 的懒惰有关比 flatMap.

我最好的猜测是添加 parallel() 会改变 flatMap() 的内部行为,而 .

您遇到的 OutOfMemoryError 错误已在 [JDK-8202307] Getting a java.lang.OutOfMemoryError: Java heap space when calling Stream.iterator().next() on a stream which uses an infinite/very big Stream in flatMap 中报告。如果您查看票证,它或多或少与您获得的堆栈跟踪相同。工单已关闭,原因如下:

The iterator() and spliterator() methods are "escape hatches" to be used when it's not possible to use other operations. They have some limitations because they turn what is a push model of the stream implementation into a pull model. Such a transition requires buffering in certain cases, such as when an element is (flat) mapped to two or more elements. It would significantly complicate the stream implementation, likely at the expense of common cases, to support a notion of back-pressure to communicate how many elements to pull through nested layers of element production.

OOME 不是 流是无限的,而是它不是

也就是说,如果您注释掉 .limit(...),它永远不会 运行 内存不足——当然,它也永远不会结束。

拆分后,如果元素在每个线程中累积,则流只能跟踪元素的数量(看起来实际的累加器是 Spliterators$ArraySpliterator#array)。

看起来你可以在没有 flatMap 的情况下重现它,只是 运行 以下 -Xmx128m:

    System.out.println(Stream
            .iterate(1, i -> i + 1)
            .parallel()
      //    .flatMap(n -> Stream.iterate(n, i -> i+n))
            .mapToInt(Integer::intValue)
            .limit(100_000_000)
            .sum()
    );

但是,在注释掉 limit() 之后,它应该 运行 没问题,直到您决定保留笔记本电脑。

除了实际的实施细节,以下是我认为正在发生的事情:

对于 limitsum reducer 希望对前 X 个元素求和,因此没有线程可以发出部分求和。每个 "slice" (线程)都需要积累元素并传递它们。没有限制,就没有这样的约束,所以每个 "slice" 只会计算它得到的元素的部分和(永远),假设它最终会发出结果。

你说“但我不太清楚事物的评估顺序和缓冲发生的位置”,这正是并行流的意义所在。评估顺序未指定。

您示例的一个关键方面是 .limit(100_000_000)。这意味着该实现不能只对任意值求和,而必须对 前 100,000,000 个数字求和。请注意,在参考实现中,.unordered().limit(100_000_000) 不会更改结果,这表明没有针对无序情况的特殊实现,但这是一个实现细节。

现在,当工作线程处理元素时,它们不能只是将它们相加,因为它们必须知道允许它们使用哪些元素,这取决于在它们的特定工作负载之前有多少元素。由于此流不知道大小,因此只有在处理了前缀元素后才能知道,而无限流永远不会发生这种情况。所以工作线程暂时保持缓冲,此信息可用。

原则上,当工作线程知道它正在处理最左边的¹工作块时,它可以立即对元素求和,对它们进行计数,并在达到限制时发出结束信号。所以 Stream 可以终止,但这取决于很多因素。

在您的情况下,一个合理的场景是其他工作线程在分配缓冲区方面比​​最左边的作业正在计数更快。在这种情况下,对时间的细微更改可能会使流偶尔 return 具有值。

当我们减慢除处理最左边块的工作线程之外的所有工作线程时,我们可以使流终止(至少在大多数运行中):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ 我遵循 在谈论遭遇顺序而不是处理顺序时使用从左到右的顺序。