为什么 flatMap() 之后的 filter() 在 Java 流中 "not completely" 是惰性的?

Why filter() after flatMap() is "not completely" lazy in Java streams?

我有以下示例代码:

System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);
System.out.println("-----------");
System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);

输出结果如下:

1
Result: 1
-----------
-1
0
1
0
1
2
1
2
3
Result: -1

从这里我看到在第一种情况下 stream 确实表现得很懒惰 - 我们使用 findFirst() 所以一旦我们有了第一个元素,我们的过滤 lambda 就不会被调用。 然而,在第二种使用 flatMaps 的情况下,我们看到尽管找到了满足过滤条件的第一个元素(它只是任何第一个元素,因为 lambda 总是 returns true),流的更多内容仍在通过过滤功能馈送。

我试图理解为什么它的行为是这样的,而不是像第一种情况那样在计算第一个元素后放弃。 任何有用的信息将不胜感激。

输入流的元素被一个一个地延迟消耗。第一个元素 1 由两个 flatMap 转换为流 -1, 0, 1, 0, 1, 2, 1, 2, 3,因此整个流仅对应于第一个输入元素。嵌套流被管道急切地具体化,然后展平,然后馈送到 filter 阶段。这解释了你的输出。

以上内容并非源于基本限制,但可能会使嵌套流的完全惰性变得更加复杂。我怀疑让它具有高性能将是一个更大的挑战。

为了进行比较,Clojure 的惰性序列为每个这样的嵌套级别都进行了另一层包装。由于这种设计,当嵌套进行到极致时,操作甚至可能会失败WhosebugError

TL;DR,这已在 JDK-8075939 and fixed in Java 10 (and backported to Java 8 in JDK-8225328) 中解决。

在查看实现 (ReferencePipeline.java) 时,我们看到方法 [link]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

将为 findFirst 操作调用。需要特别注意的是 sink.cancellationRequested() 允许在第一次匹配时结束循环。比较 [link]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // We can do better than this, by polling cancellationRequested when stream is infinite
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    try (Stream<? extends R> result = mapper.apply(u)) {
                        // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                        if (result != null)
                            result.sequential().forEach(downstream);
                    }
                }
            };
        }
    };
}

推进一项的方法最终在子流上调用forEach,没有任何提前终止的可能性,flatMap方法开头的注释甚至说明了这个缺失的特性.

由于这不仅仅是一个优化问题,因为它意味着当子流无限大时代码会简单地中断,我希望开发人员尽快证明他们“可以做得比这更好”……


为了说明其含义,虽然 Stream.iterate(0, i->i+1).findFirst() 按预期工作,但 Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst() 将陷入无限循环。

关于规范,大部分可以在

中找到

chapter “Stream operations and pipelines” of the package specification:

Intermediate operations return a new stream. They are always lazy;

… Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source. (This behavior becomes even more important when the input stream is infinite and not merely large.)

Further, some operations are deemed short-circuiting operations. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.

很明显,短路操作并不能保证有限时间终止,例如当过滤器与任何项目都不匹配时,处理无法完成,但是通过简单地忽略操作的短路性质而在有限时间内不支持任何终止的实现远离规范。

关于无限子流的破坏,当一个 中间(相对于终端)短路操作时,flatMap 的行为变得更加令人惊讶。

虽然下面的工作按预期进行,但打印出无限的整数序列

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).forEach(System.out::println);

以下代码仅打印出“1”,但仍然终止:

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).limit(1).forEach(System.out::println);

我无法想象阅读规范时这不是错误。

在我的免费 StreamEx library I introduced the short-circuiting collectors. When collecting sequential stream with short-circuiting collector (like MoreCollectors.first()) 中,正好从源中消耗了一个元素。在内部,它以非常肮脏的方式实现:使用自定义异常来中断控制流。使用我的库,您的示例可以这样重写:

System.out.println(
        "Result: " +
                StreamEx.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .collect(MoreCollectors.first())
                .get()
        );

结果如下:

-1
Result: -1

我同意其他人的看法,这是在 JDK-8075939. And since it's still not fixed more than one year later. I would like to recommend you: AbacusUtil

上发现的错误
N.println("Result: " + Stream.of(1, 2, 3).peek(N::println).first().get());

N.println("-----------");

N.println("Result: " + Stream.of(1, 2, 3)
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .peek(N::println).first().get());

// output:
// 1
// Result: 1
// -----------
// -1
// Result: -1

披露:我是 AbacusUtil 的开发者。

可惜.flatMap()并不懒惰。但是,此处提供了自定义 flatMap 解决方法:Why .flatMap() is so inefficient (non lazy) in java 8 and java 9

今天我也无意中发现了这个错误。行为不是那么直截了当,导致简单的情况,如下所示,工作正常,但类似的生产代码不起作用。

 stream(spliterator).map(o -> o).flatMap(Stream::of).flatMap(Stream::of).findAny()

对于不能再等几年再迁移到 JDK-10 的人来说,还有另一种真正的懒惰流。它不支持并行。它专用于 JavaScript 翻译,但它对我有用,因为界面是一样的。

StreamHelper 是基于集合的,但很容易适配 Spliterator。

https://github.com/yaitskov/j4ts/blob/stream/src/main/java/javaemul/internal/stream/StreamHelper.java

虽然 JDK-8075939 has been fixed in Java 11 and backported to 10 and 8u222, there's still an edge case of flatMap() not being truly lazy when using Stream.iterator(): JDK-8267359,仍然存在于 Java 17.

这个

Iterator<Integer> it =
    Stream.of("a", "b")
        .flatMap(s -> Stream
            .of(1, 2, 3, 4)
            .filter(i -> { System.out.println(i); return true; }))
        .iterator();

it.hasNext(); // This consumes the entire flatmapped stream
it.next();

版画

1
2
3
4

同时:

Iterator<Integer> it =
    Stream.of("a", "b")
        .flatMap(s -> Stream
            .iterate(1, i -> i)
            .filter(i -> { System.out.println(i); return true; }))
        .iterator();

it.hasNext();
it.next();

永不终止