Java 流操作融合和有状态中间操作

Java stream operation fusion and stateful intermediate operations

我一直在努力理解和展示 Java 流如何在幕后实现一种循环融合,以便可以将多个操作融合到一个通道中。

这里是第一个例子:

Stream.of("The", "cat", "sat", "on", "the", "mat")
        .filter(w -> {
            System.out.println("Filtering: " + w);
            return w.length() == 3;
        })
        .map(w -> {
            System.out.println("Mapping: " + w);
            return w.toUpperCase();
        })
        .forEach(w -> System.out.println("Printing: " + w));

具有以下输出(每个元素的单通道融合非常清楚):

Filtering: The
Mapping: The
Printing: THE
Filtering: cat
Mapping: cat
Printing: CAT
Filtering: sat
Mapping: sat
Printing: SAT
Filtering: on
Filtering: the
Mapping: the
Printing: THE
Filtering: mat
Mapping: mat
Printing: MAT

第二个例子是一样的,但我在过滤器和映射之间使用了 sorted() 操作:

Stream.of("The", "cat", "sat", "on", "the", "mat")
        .filter(w -> {
            System.out.println("Filtering: " + w);
            return w.length() == 3;
        })
        .sorted()
        .map(w -> {
            System.out.println("Mapping: " + w);
            return w.toUpperCase();
        })
        .forEach(w -> System.out.println("Printing: " + w));

这有以下输出:

Filtering: The
Filtering: cat
Filtering: sat
Filtering: on
Filtering: the
Filtering: mat
Mapping: The
Printing: THE
Mapping: cat
Printing: CAT
Mapping: mat
Printing: MAT
Mapping: sat
Printing: SAT
Mapping: the
Printing: THE

所以我的问题在这里,通过调用 distinct,我的想法是否正确,因为它是一个 "stateful" 中间操作,它不允许在单次传递期间单独处理单个元素(所有操作)。此外,由于sorted()有状态操作需要处理整个输入流才能产生结果,所以这里不能部署融合技术,所以所有的过滤都先发生,然后将映射和打印操作融合在一起,排序后?如果我的任何假设不正确,请纠正我,并随时详细说明我已经说过的内容。

此外,它如何在幕后决定是否可以将元素融合到一个通道中,例如,当 distinct() 操作存在时,是否有一个简单的标志关闭以停止它不会像 distinct() 不存在时那样发生?

最后一个问题是,虽然将操作融合到单次传递中的好处有时是显而易见的,例如,当与短路结合使用时。将 filter-map-forEach 甚至 filter-map-sum 等操作融合在一起的主要好处是什么?

是的,差不多是这样。所有这些都可以通过查看源代码来检查。

不过,Fusion 并没有像您认为的那样实现。无需查看整个管道并决定如何融合它;没有旗帜或任何东西;只是操作是否被表示为 StatefulOp 对象,它可以 运行 整个流到那个点并获得所有输出,或者 StatelessOp 只是装饰 Sink 表示元素的去向。您可以查看源代码,例如sorted and map 例如。

无状态操作(map、filter、flatMap、peek等)完全融合;我们构建了一个级联 Consumer 对象链并将数据倒入其中。每个元素都可以相互独立地进行操作,因此链中永远不会有任何东西 "stuck"。 (这就是 Louis 所说的如何实现融合的意思——我们将阶段组合成一个大函数,并将数据提供给它。)

Stateful 操作(distinct、sorted、limit 等)更复杂,并且其行为变化更大。每个有状态操作都可以选择它自己的实现方式,因此它可以选择尽可能少的侵入性方法。例如,distinct(在某些情况下)让元素在审查时出来,而 sorted 是一个完整的障碍。 (不同之处在于懒惰的可能性有多大,以及它们如何处理诸如无限源之类的下游限制操作。)

的确,有状态的操作通常会破坏融合的一些好处,但不是全部(上游和下游的操作仍然可以融合。)

除了您观察到的短路值之外,融合的其他重大优势还包括 (a) 您不必在阶段之间填充中间结果容器,以及 (b) 您正在处理的数据with 总是 "hot" 在缓存中。