Java 8 Stream:limit() 和 skip() 的区别

Java 8 Stream: difference between limit() and skip()

说到Streams,当我执行这段代码时

public class Main {
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

我得到这个输出

A1B1C1
A2B2C2
A3B3C3

因为将我的流限制为前三个组件会强制执行操作 ABC 只执行三次。

尝试使用 skip() 方法对最后三个元素执行类似的计算,显示出不同的行为:this

public class Main {
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .skip(6)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

输出这个

A1
A2
A3
A4
A5
A6
A7B7C7
A8B8C8
A9B9C9

为什么在这种情况下,正在执行操作 A1A6?一定是和limitshort-circuiting stateful intermediate operation,而skip不是,但我不明白这 属性 的实际意义。难道只是"every action before skip is executed while not everyone before limit is"?

流式管道的流畅符号是造成这种混淆的原因。这样想:

limit(3)

除了 forEach(),它是一个 terminal operation,它会触发 "execution of the pipeline".[=69,但所有流水线操作都是延迟计算的=]

执行管道时,中间流定义不会对发生的情况做出任何假设 "before""after"。他们所做的就是获取输入流并将其转换为输出流:

Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9);
Stream<Integer> s2 = s1.peek(x->System.out.print("\nA"+x));
Stream<Integer> s3 = s2.limit(3);
Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x));

s4.forEach(x->System.out.print("C"+x));
  • s1 包含 9 个不同的 Integer 值。
  • s2 查看传递给它的所有值并打印它们。
  • s3 将前 3 个值传递给 s4 并在第三个值之后中止管道。 s3 没有产生更多的值。 这并不意味着管道中没有更多值。 s2 仍会产生(并打印)更多值,但没有人请求这些值,因此执行停止。
  • s4 再次查看传递给它的所有值并打印它们。
  • forEach 消费并打印 s4 传递给它的任何内容。

这样想。整个流完全是懒惰的。只有终端操作主动 从管道中拉取 新值。从 s4 <- s3 <- s2 <- s1 中提取 3 个值后,s3 将不再产生新值,并且不再从 s2 <- s1 中提取任何值。虽然 s1 -> s2 仍然能够生成 4-9,但这些值永远不会从管道中提取,因此永远不会被 s2.

打印出来

skip(6)

skip() 发生同样的事情:

Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9);
Stream<Integer> s2 = s1.peek(x->System.out.print("\nA"+x));
Stream<Integer> s3 = s2.skip(6);
Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x));

s4.forEach(x->System.out.print("C"+x));
  • s1 包含 9 个不同的 Integer 值。
  • s2 查看传递给它的所有值并打印它们。
  • s3 消耗前 6 个值,"skipping them",这意味着前 6 个值不会传递给 s4,仅随后的值是。
  • s4 再次查看传递给它的所有值并打印它们。
  • forEach 消费并打印 s4 传递给它的任何内容。

这里重要的是 s2 不知道剩余的管道跳过任何值。 s2 独立于之后发生的事情查看所有值。

另一个例子:

考虑这条管道,which is listed in this blog post

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .distinct()
         .limit(10)
         .forEach(System.out::println);

当你执行上面的命令时,程序将永远不会停止。为什么?因为:

IntStream i1 = IntStream.iterate(0, i -> ( i + 1 ) % 2);
IntStream i2 = i1.distinct();
IntStream i3 = i2.limit(10);

i3.forEach(System.out::println);

这意味着:

  • i1 生成无限量的交替值:010101, ...
  • i2 消耗之前遇到的所有值,只传递 "new" 值,即总共有 2 个值出来i2.
  • i3 传递 10 个值,然后停止。

这个算法永远不会停止,因为 i3 等待 i201 之后再产生 8 个值,但是这些值永远不会出现,而 i1 永远不会停止向 i2.

提供值

在管道中的某个时刻,产生了 10 个以上的值并不重要。重要的是 i3 从未见过这 10 个值。

回答你的问题:

Is it just that "every action before skip is executed while not everyone before limit is"?

没有。执行 skip()limit() 之前的所有操作。在你的两次处决中,你得到 A1 - A3。但是 limit() 可能会使管道短路,一旦感兴趣的事件(达到限制)发生就中止价值消费。

你这里有两条流管道。

这些流管道分别由一个源、几个中间操作和一个终端操作组成。

但是中间操作比较懒。这意味着除非下游操作需要一个项目,否则什么也不会发生。当它这样做时,中间操作会完成它需要的所有工作来生成所需的项目,然后再次等待直到请求另一个项目,依此类推。

终端操作通常是"eager"。也就是说,他们要求流中完成他们所需的所有项目。

所以你真的应该把管道想象成 forEach 向它后面的流询问下一个项目,然后那个流向它后面的流询问,依此类推,一直到源头。

考虑到这一点,让我们看看您的第一个管道有什么:

Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));

因此,forEach 要求第一项。这意味着 "B" peek 需要一个项目,并为它请求 limit 输出流,这意味着 limit 将需要请求 "A" peek,这是源代码。给定一个项目,一直到 forEach,您得到第一行:

A1B1C1

forEach 要求另一个项目,然后另一个。每次,请求都会向上传播并执行。但是当forEach请求第四项时,当请求到limit时,它知道它已经给出了所有允许给的项。

因此,它不会要求 "A" 查看另一个项目。它立即表明其项目已用完,因此不再执行任何操作并且 forEach 终止。

第二条管道发生了什么?

    Stream.of(1,2,3,4,5,6,7,8,9)
    .peek(x->System.out.print("\nA"+x))
    .skip(6)
    .peek(x->System.out.print("B"+x))
    .forEach(x->System.out.print("C"+x));

同样,forEach 要求第一项。这是传播回来的。但是当它到达 skip 时,它知道必须从其上游请求 6 项才能将一项传递给下游。因此它从 "A" peek 向上游发出请求,在不向下游传递的情况下使用它,发出另一个请求,等等。因此 "A" peek 获得了 6 个项目请求并生成了 6 个打印件,但这些项目没有传递下去。

A1
A2
A3
A4
A5
A6

skip 提出的第 7 个请求中,该项目被传递到 "B" peek,然后从它传递到 forEach,因此完整打印完成:

A7B7C7

然后就和以前一样了。 skip 现在,每当它收到请求时,都会向上游请求一个项目并将其传递给下游,因为它 "knows" 它已经完成了它的跳过工作。因此,其余的印刷品将通过整个管道,直到源耗尽。

所有的流都是基于spliterators的,spliterators基本有两种操作:advance(向前移动一个元素,类似于iterator)和split(在任意位置分割自己,适合并行处理)。您可以随时停止获取输入元素(由 limit 完成),但您不能跳转到任意位置(Spliterator 界面中没有这样的操作)。因此 skip 操作需要实际从源中读取第一个元素以忽略它们。请注意,在某些情况下您可以执行实际跳跃:

List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9);

list.stream().skip(3)... // will read 1,2,3, but ignore them
list.subList(3, list.size()).stream()... // will actually jump over the first three elements

单独查看 Steam 操作完全是亵渎神明,因为这不是评估流的方式。

说到limit(3),是个短路操作,想想也是有道理的,不管什么操作都是before ] 和 after limit,在流中有限制将在获得 n 个元素后停止迭代 till limit操作,但这并不意味着只处理n个流元素。以这个不同的流操作为例

public class App 
{
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .filter(x -> x%2==0)
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

会输出

A1
A2B2C2
A3
A4B4C4
A5
A6B6C6

这似乎是正确的,因为 limit 正在等待 3 个流元素通过操作链,尽管处理了 6 个流元素。

也许这个小图有助于获得一些关于如何处理流的自然 "feeling"。

第一行=>8=>=7=...===描述了流。元素 1..8 从左向右流动。一共有三个"windows":

  1. 在第一个 window (peek A) 你看到了一切
  2. 在第二个 window(skip 6limit 3)中完成了一种过滤。第一个或最后一个元素是 "eliminated" - 表示不传递以进行进一步处理。
  3. 在第三个 window 中,您只会看到那些已传递的项目

┌────────────────────────────────────────────────────────────────────────────┐ │ │ │▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸ │ │ 8 7 6 5 4 3 2 1 │ │▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸ │ │ │ │ │ │ │ │ skip 6 │ │ │ peek A limit 3 peek B │ └────────────────────────────────────────────────────────────────────────────┘

可能并非此解释中的所有内容(甚至可能不是任何内容)在技术上都是完全正确的。但是当我这样看时,我很清楚哪些项目到达了哪些串联指令。