这是 Files.lines() 中的错误,还是我对并行流有误解?

Is this a bug in Files.lines(), or am I misunderstanding something about parallel streams?

环境:Ubuntu x86_64 (14.10),Oracle JDK 1.8u25

我尝试在第一行使用 Files.lines() but I want to .skip() 的并行流(这是一个带有 header 的 CSV 文件)。因此我尝试这样做:

try (
    final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
        .skip(1L).parallel();
) {
    // etc
}

但随后一列无法解析为 int...

所以我尝试了一些简单的代码。文件问题非常简单:

$ cat info.csv 
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$

而且代码同样简单:

public static void main(final String... args)
{
    final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
    Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
        .forEach(System.out::println);
}

系统地 得到以下结果(好的,我只 运行 它大约 20 次):

startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes

我在这里错过了什么?


编辑 问题或误解似乎比这更根深蒂固(下面的两个例子是由 FreeNode 上的一个人编造的 ##java):

public static void main(final String... args)
{
    new BufferedReader(new StringReader("Hello\nWorld")).lines()
        .skip(1L).parallel()
        .forEach(System.out::println);

    final Iterator<String> iter
        = Arrays.asList("Hello", "World").iterator();
    final Spliterator<String> spliterator
        = Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED);
    final Stream<String> s
        = StreamSupport.stream(spliterator, true);

    s.skip(1L).forEach(System.out::println);
}

这会打印:

Hello
Hello

嗯。

@Holger 建议对于任何 ORDERED 而不是 SIZED 的流都会发生这种情况,其他示例:

Stream.of("Hello", "World")
    .filter(x -> true)
    .parallel()
    .skip(1L)
    .forEach(System.out::println);

此外,根据已经发生的所有讨论,问题(如果是一个?)与 .forEach()(如 )有关。

问题是您将并行流与 forEach 一起使用,并且您期望跳过操作依赖于正确的元素顺序,但此处并非如此。 forEach 文档摘录:

For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism.

我想基本上发生的事情是跳过操作首先在第二行执行,而不是在第一行。如果您使流顺序或使用 forEachOrdered you can see that then it produces the expected result. Another approach would be to use Collectors.

让我引用一些相关的东西——skip 的 Javadoc:

While skip() is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines, especially for large values of n, since skip(n) is constrained to skip not just any n elements, but the first n elements in the encounter order.

现在,可以肯定的是 Files.lines() 具有 明确定义的相遇顺序并且是 ORDERED 流(如果不是,则有即使在顺序操作中也不能保证遇到顺序与文件顺序匹配),因此可以保证生成的流将确定性地仅包含示例中的第二行。

不管有没有其他的,保证是肯定有的。

此答案已过时 - 请改为阅读


快速回答问题:观察到的行为是有意的!没有错误,一切都根据文档发生。但可以这么说,应该更好地记录和沟通这种行为。 forEach 如何忽略排序应该更加明显。

我将首先介绍允许观察到的行为的概念。这为剖析问题中给出的 示例 之一提供了背景。我将在高级别上执行此操作,然后在非常低级别上再次执行此操作。

[TL;DR: 自行阅读,高水平解释会给出一个粗略的答案。]

概念

我们不讨论 Streams,它是流相关方法操作或 returned 的类型,我们来谈谈 流操作流管道。方法调用 linesskipparallel 是构建流管道 [1] 的流操作,并且 - 正如其他人所指出的那样 - 当终端操作 forEach 被称为 [2].

管道可以被认为是一系列操作,一个接一个地在整个流上执行(例如过滤所有元素,将剩余元素映射到数字,对所有数字求和)。 但这是一种误导! 一个更好的比喻是终端操作通过每个操作拉取单个元素[3](例如获取下一个未过滤的元素,映射它,将它添加到总和,请求下一个元素)。一些中间操作可能需要遍历几个(例如skip)甚至所有(例如sort)元素才能return请求的下一个元素,这是状态的来源之一在手术中。

每个操作都用这些 StreamOpFlag 表示其特征:

  • DISTINCT
  • SORTED
  • ORDERED
  • SIZED
  • SHORT_CIRCUIT

它们结合了流源、中间操作和终端操作,构成了管道(作为一个整体)的特征,然后用于优化[4]。同理一个pipeline是否并行执行是整个pipeline[5]的一个属性.

因此,无论何时您对这些特征做出假设,都必须仔细查看构建管道的所有操作,无论它们的应用顺序如何,以及什么保证他们做了。这样做时请记住终端操作如何通过管道拉动每个单独的元素。

例子

让我们看看这个特例:

BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar"));
fooBarReader.lines()
        .skip(1L)
        .parallel()
        .forEach(System.out::println);

高级别

无论您的流源是否有序(它是),通过调用 forEach(而不是 forEachOrdered)您声明 order 不重要you[6],有效地将skip从"skip the first n elements"减少到"skip any n elements"[7](因为没有顺序前者变得毫无意义)。

因此,如果承诺加速,您就赋予管道忽略顺序的权利。对于并行执行,它显然是这么认为的,这就是为什么你得到观察到的输出。因此,您观察到的是预期行为,没有错误。

请注意,这 skip 是有状态的 并不冲突!如上所述,有状态并不意味着它以某种方式缓存整个流(减去跳过的元素)并且随后的所有内容都在这些元素上执行。这只是意味着该操作有一些状态 - 即跳过的元素的数量(好吧,它实际上不是 that easy 但由于我对正在发生的事情的理解有限,我会说这是一个公平的简化)。

低​​级别

让我们更详细地看一下:

  1. BufferedReader.lines 创建 Stream,我们称它为 _lines
  2. .skip 创建一个新的 Stream,我们称它为 _skip
    • 来电ReferencePipeline.skip
    • SliceOps.makeRef
    • 构造一个"slice"操作(skip & limit的泛化)
    • 这创建了一个 ReferencePipeline.StatefulOp 的匿名实例,它引用 _lines 作为它的来源
  3. .parallel 如上所述为整个管道设置并行标志
  4. .forEach实际开始执行

那么让我们看看管道是如何执行的:

  1. 调用 _skip.forEach creates a ForEachOp (let's call it _forEach) and hands it to _skip.evaluate,它会做两件事:
    1. 调用 sourceSpliterator 围绕此流水线阶段的源创建拆分器:
    2. 调用_forEach.evaluateParallel which creates a ForEachTask(因为它是无序的;我们称它为_forEachTask)并调用它
  2. _forEachTask.compute 中,任务拆分前 1024 行,为其创建一个新任务(我们称之为 _forEachTask2),意识到没有剩余行并完成。
  3. 在分叉连接池中,_forEachTask2.compute gets called, vainly tries to split again and finally starts copying its elements into the sink (a stream-aware wrapper around the System.out.println) by calling _skip.copyInto
  4. 这实际上是将任务委托给指定的拆分器。 这是上面创建的_sliceSpliterator所以_sliceSpliterator.forEachRemaining负责将未跳过的元素交给println-sink:
    1. 它将一大块(在本例中为所有)行放入缓冲区并对它们进行计数
    2. 它尝试通过 acquirePermits
    3. 请求尽可能多的许可(我假设是由于并行化)
    4. 源中有两个元素和一个要跳过的元素,它只获得一个许可(通常假设 n
    5. 它让缓冲区将第一个 n 个元素(所以在这种情况下只有第一个)放入接收器

所以 UnorderedSliceSpliterator.OfRef.forEachRemaining 是最终真正忽略订单的地方。 我没有将它与订购的变体进行比较,但这是我的假设,为什么这样做方式:

  • 在并行化下将拆分器的元素铲入缓冲区可能会与其他执行相同操作的任务交错
  • 这将使跟踪他们的订单变得极其困难
  • 这样做或防止交错会降低性能,如果顺序不相关则毫无意义
  • 如果顺序丢失,除了处理第一个 n 个允许的元素外别无他法

有什么问题吗? ;) 抱歉这么久了。也许我应该省略细节并制作一个博客post....

来源

[1] java.util.stream - Stream operations and pipelines:

Stream operations are divided into intermediate and terminal operations, and are combined to form stream pipelines.

[2] java.util.stream - Stream operations and pipelines:

Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed.

[3] 这个比喻代表了我对流的理解。除了代码之外,主要来源是来自 java.util.stream - Stream operations and pipelines 的引述(突出显示我的):

Processing streams lazily allows for significant efficiencies; in a pipeline such as the filter-map-sum example above, filtering, mapping, and summing can be fused into a single pass on the data, with minimal intermediate state. Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source.

[4] java.util.stream.StreamOpFlag:

At each stage of the pipeline, a combined stream and operation flags can be calculated [... jadda, jadda, jadda about how flags are combined across source, intermediate and terminal operations ...] to produce the flags output from the pipeline. Those flags can then be used to apply optimizations.

在代码中,您可以在 AbstractPipeline.combinedFlags 中看到这一点,它是在构造期间(以及在其他一些事件中)通过结合前一个操作和新操作的标志来设置的。

[5] java.util.stream - Parallelism(我不能直接 link - 向下滚动一点):

When the terminal operation is initiated, the stream pipeline is executed sequentially or in parallel depending on the orientation of the stream on which it is invoked.

在代码中你可以看到它在 AbstractPipeline.sequential, parallel, and isParallel 中,set/check 流源上的一个布尔标志,使得它在构建流时调用 setter 时无关紧要。

[6] java.util.stream.Stream.forEach:

Performs an action for each element of this stream. [...] The behavior of this operation is explicitly nondeterministic.

对比java.util.stream.Stream.forEachOrdered

Performs an action for each element of this stream, in the encounter order of the stream if the stream has a defined encounter order.

[7] 这也没有明确记录,但我对 Stream.skip 上的评论的解释(我大大缩短):

[...] skip() [...] can be quite expensive on ordered parallel pipelines [...] since skip(n) is constrained to skip not just any n elements, but the first n elements in the encounter order. [...] [R]emoving the ordering constraint [...] may result in significant speedups of skip() in parallel pipelines

我知道如何解决这个问题,我在之前的讨论中看不到。您可以重新创建流,将管道分成两个管道,同时保持整个过程惰性。

public static <T> Stream<T> recreate(Stream<T> stream) {
    return StreamSupport.stream(stream.spliterator(), stream.isParallel())
                        .onClose(stream::close);
}

public static void main(String[] args) {
    recreate(new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5")).lines()
        .skip(1).parallel()).forEach(System.out::println);
}

当您从初始流拆分器重新创建流时,您实际上创建了一个新管道。在大多数情况下,recreate 将作为 no-op 工作,但问题是第一和第二管道不共享 parallelunordered 状态。因此,即使您使用 forEach(或任何其他无序终端操作),也只有第二个流变得无序。

内部非常相似的事情是将您的流与空流连接:

Stream.concat(Stream.empty(), 
    new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5"))
          .lines().skip(1).parallel()).forEach(System.out::println);

虽然它有更多的开销。

由于问题的当前状态与此处所做的早期陈述完全相反,应该注意的是,现在有一个 about the back-propagation of the unordered characteristic past a skip operation is considered a bug. 现在被认为没有反向传播终端操作的有序性。

还有一个related bug report, JDK-8129120 whose status is “fixed in Java 9” and it’s backported to Java 8, update 60

我用 jdk1.8.0_60 做了一些测试,看起来现在的实现确实表现出了更直观的行为。