Stream.skip 无序终端操作的行为

Stream.skip behavior with unordered terminal operation

我已经阅读了 and 个问题,但仍然怀疑观察到的 Stream.skip 行为是否是 JDK 作者的意图。

让我们简单地输入数字 1..20:

List<Integer> input = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());

现在让我们创建一个并行流,将 unordered()skip() 以不同的方式组合并收集结果:

System.out.println("skip-skip-unordered-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .skip(1)
            .unordered()
            .collect(Collectors.toList()));
System.out.println("skip-unordered-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .skip(1)
            .collect(Collectors.toList()));
System.out.println("unordered-skip-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .unordered()
            .skip(1)
            .skip(1)
            .collect(Collectors.toList()));

过滤步骤在这里基本上什么都不做,但增加了流引擎的难度:现在它不知道输出的确切大小,因此关闭了一些优化。我得到以下结果:

skip-skip-unordered-toList: [3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// absent values: 1, 2
skip-unordered-skip-toList: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19, 20]
// absent values: 1, 15
unordered-skip-skip-toList: [1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 20]
// absent values: 7, 18

结果完全没问题,一切正常。在第一种情况下,我要求跳过前两个元素,然后以无特定顺序收集列表。在第二种情况下,我要求跳过第一个元素,然后变成无序并再跳过一个元素(我不在乎是哪个元素)。第三种情况我先转成无序模式,然后跳过任意两个元素。

我们跳过一个元素,以无序方式收集到自定义集合中。我们的自定义集合将是 HashSet:

System.out.println("skip-toCollection: "
        + input.parallelStream().filter(x -> x > 0)
        .skip(1)
        .unordered()
        .collect(Collectors.toCollection(HashSet::new)));

输出满意:

skip-toCollection: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// 1 is skipped

所以一般来说,我希望只要流是有序的,skip() 就会跳过第一个元素,否则会跳过任意​​元素。

但是让我们使用等效的无序终端操作 collect(Collectors.toSet()):

System.out.println("skip-toSet: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .collect(Collectors.toSet()));

现在输出是:

skip-toSet: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 17, 18, 19, 20]
// 13 is skipped

任何其他无序终端操作(如 forEachfindAnyanyMatch 等)都可以实现相同的结果。在这种情况下删除 unordered() 步骤没有任何改变。似乎 unordered() 步骤正确地使流从当前操作开始无序,无序终端操作使整个流从一开始就无序,尽管如果使用 skip() 这可能会影响结果。这似乎完全误导了我:我希望使用无序收集器与在终端操作之前将流转换为无序模式并使用等效的有序收集器是一样的。

所以我的问题是:

  1. 此行为是故意的还是错误?
  2. 如果是,是否在某处记录了它?我读过包摘要中的 Stream.skip() documentation: it does not say anything about unordered terminal operations. Also Characteristics.UNORDERED documentation is not very comprehend and does not say that ordering will be lost for the whole stream. Finally, Ordering 部分也没有涵盖这种情况。可能我遗漏了什么?
  3. 如果无序终端操作的目的是使整个流无序,为什么unordered()步骤才使其无序从这一点开始?我可以依赖这种行为吗?或者我只是很幸运,我的第一次测试效果很好?

@Ruben, you probably don't understand my question. Roughly the problem is: why unordered().collect(toCollection(HashSet::new)) behaves differently than collect(toSet()). Of course I know that toSet() is unordered.

可能吧,但是,无论如何,我会再试一次。

查看收集器 toSet 和 toCollection 的 Javadoc,我们可以看到 toSet 提供了一个 无序收集器

This is an {@link Collector.Characteristics#UNORDERED unordered} Collector.

即具有 UNORDERED 特性的 CollectorImpl。查看 Collector.Characteristics#UNORDERED 的 Javadoc,我们可以阅读:

Indicates that the collection operation does not commit to preserving the encounter order of input elements

在Collector的Javadoc中我们还可以看到:

For concurrent collectors, an implementation is free to (but not required to) implement reduction concurrently. A concurrent reduction is one where the accumulator function is called concurrently from multiple threads, using the same concurrently-modifiable result container, rather than keeping the result isolated during accumulation. A concurrent reduction should only be applied if the collector has the {@link Characteristics#UNORDERED} characteristics or if the originating data is unordered

这对我来说意味着,如果我们设置 UNORDERED 特征,我们根本不关心流元素传递到累加器的顺序,因此,可以按任何顺序从管道中提取元素。

顺便说一句,如果您在示例中省略 unordered() ,您会得到相同的行为:

    System.out.println("skip-toSet: "
            + input.parallelStream().filter(x -> x > 0)
                .skip(1)
                .collect(Collectors.toSet()));

另外,Stream中的skip()方法给了我们一个提示:

While {@code skip()} is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines

Using an unordered stream source (such as {@link #generate(Supplier)}) or removing the ordering constraint with {@link #unordered()} may result in significant speedups

使用时

Collectors.toCollection(HashSet::new)

你正在创建一个普通的 "ordered" 收集器(一个没有无序特征的收集器),对我来说意味着你确实关心顺序,因此,元素被按顺序提取,你得到预期的行为。

回想一下,流标志(ORDERED、SORTED、SIZED、DISTINCT)的目标是启用操作以避免做不必要的工作。涉及流标志的优化示例是:

  • 如果我们知道流已经排序,那么sorted()是一个空操作;
  • 如果我们知道流的大小,我们可以在 toArray() 中预先分配一个正确大小的数组,避免复制;
  • 如果我们知道输入没有有意义的相遇顺序,我们就不需要采取额外的步骤来保留相遇顺序。

管道的每个阶段都有一组流标志。中间操作可以注入、保留或清除流标志。例如,过滤保留排序/区别性但不保留大小;映射保留大小,但不保留排序或独特性。排序注入排序。中间操作的标志处理相当简单,因为所有决策都是本地的。

终端操作标志的处理更加微妙。 ORDERED 是与终端操作最相关的标志。如果终端操作是无序的,那么我们会反向传播无序性。

我们为什么要这样做?好吧,考虑这个管道:

set.stream()
   .sorted()
   .forEach(System.out::println);

既然forEach不拘泥于顺序操作,那么整理列表的工作完全是白费力气。所以我们反向传播这个信息(直到我们命中一个短路操作,比如limit),以免失去这个优化机会。同样,我们可以在无序流上使用 distinct 的优化实现。

Is this behavior intended or it's a bug?

是 :) 反向传播是有意的,因为它是一种有用的优化,不应产生不正确的结果。然而,错误部分是我们正在传播过去的 skip,这是我们不应该的。所以 UNORDERED 标志的反向传播过于激进,这是一个错误。我们会 post 一个错误。

If yes is it documented somewhere?

它应该只是一个实现细节;如果正确实施,您不会注意到(除非您的流速度更快。)