takeWhile() 与 flatmap 的工作方式不同

takeWhile() working differently with flatmap

我正在使用 takeWhile 创建片段以探索其可能性。与 flatMap 结合使用时,行为不符合预期。请在下面找到代码片段。

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

实际输出:

Sample1
Sample2
Sample3
Sample5

预期输出:

Sample1
Sample2
Sample3

预期的原因是 takeWhile 应该一直执行到内部条件变为真为止。我还在 flatmap 中添加了打印输出语句以进行调试。流仅返回两次,符合预期。

但是,如果链中没有平面图,这也能正常工作。

String[] strArraySingle = {"Sample3", "Sample4", "Sample5"};
Arrays.stream(strArraySingle)
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

实际输出:

Sample3

此处实际输出与预期输出相符。

免责声明:这些片段仅用于代码练习,不提供任何有效用例。

更新: 错误 JDK-8193856:修复将作为 JDK 10 的一部分提供。 更改将更正 whileOps Sink::accept

@Override 
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

更改实施:

@Override
public void accept(T t) {
    if (take && (take = predicate.test(t))) {
        downstream.accept(t);
    }
}

原因是使用了 flatMap operation also being an intermediate operations with which (one of) the stateful short-circuiting intermediate operation takeWhile

Holger 在 中指出的 flatMap 的行为当然是一个参考,不应错过以了解此类短路操作的意外输出。

您可以通过引入终端操作来拆分这两个中间操作来实现您的预​​期结果,以进一步确定性地使用有序流并针对示例执行它们:

List<String> sampleList = Arrays.stream(strArray).flatMap(Arrays::stream).collect(Collectors.toList());
sampleList.stream().takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
            .forEach(System.out::println);

另外,似乎有一个相关的 Bug#JDK-8075939 来追踪这个已经注册的行为。

编辑:这可以在 JDK-8193856 处进一步跟踪,接受为错误。

如果你看the documentation for takeWhile

if this stream is ordered, [returns] a stream consisting of the longest prefix of elements taken from this stream that match the given predicate.

if this stream is unordered, [returns] a stream consisting of a subset of elements taken from this stream that match the given predicate.

您的流被巧合地订购了,但 takeWhile 不知道 它是。因此,它返回第二个条件 - 子集。你的 takeWhile 表现得像 filter

如果您在 takeWhile 之前添加对 sorted 的调用,您将看到预期的结果:

Arrays.stream(strArray)
      .flatMap(indStream -> Arrays.stream(indStream))
      .sorted()
      .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
      .forEach(ele -> System.out.println(ele));

这是 JDK 9 中的错误 - 来自 issue #8193856:

takeWhile is incorrectly assuming that an upstream operation supports and honors cancellation, which unfortunately is not the case for flatMap.

说明

如果流是有序的,takeWhile 应该显示预期的行为。这在您的代码中并非完全如此,因为您使用 forEach,它放弃了顺序。如果您关心它,就像您在本例中所做的那样,您应该改用 forEachOrdered。有趣的是:这并没有改变任何东西。

所以也许流首先没有被排序? (在那种情况下 。)如果你为从 strArray 创建的流创建一个临时变量,并通过在断点处执行表达式 ((StatefulOp) stream).isOrdered(); 检查它是否有序,你会发现它确实被订购了:

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Stream<String> stream = Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));

// breakpoint here
System.out.println(stream);

这意味着这很可能是一个实现错误。

进入代码

正如其他人所怀疑的那样,我现在也认为这 可能 flatMap 渴望有关。更准确地说,这两个问题可能有相同的根本原因。

查看WhileOps的源码,我们可以看到这些方法:

@Override
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

@Override
public boolean cancellationRequested() {
    return !take || downstream.cancellationRequested();
}

takeWhile 使用此代码检查给定流元素 t 是否满足 predicate:

  • 如果是,它将元素传递给 downstream 操作,在本例中为 System.out::println
  • 如果不是,它将 take 设置为 false,因此当下次询问是否应取消管道时(即完成),它 returns true

这涵盖了 takeWhile 操作。您需要知道的另一件事是 forEachOrdered 导致终端操作执行方法 ReferencePipeline::forEachWithCancel:

@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    boolean cancelled;
    do { } while (
            !(cancelled = sink.cancellationRequested())
            && spliterator.tryAdvance(sink));
    return cancelled;
}

所有这一切都是:

  1. 检查管道是否被取消
  2. 如果不是,则将接收器提前一个元素
  3. 如果这是最后一个元素则停止

看起来很有前途,对吧?

没有flatMap

在 "good case" 中(没有 flatMap;你的第二个例子)forEachWithCancel 直接在 WhileOp 上作为 sink 操作,你可以看到这是如何播放:

  • ReferencePipeline::forEachWithCancel 执行循环:
    • WhileOps::accept 被赋予每个流元素
    • WhileOps::cancellationRequested在每个元素后查询
  • 在某些时候"Sample4" 谓词失败并且流被取消

耶!

flatMap

在 "bad case"(使用 flatMap;您的第一个示例)中,forEachWithCancelflatMap 操作进行操作,不过,它只是调用 forEachRemainingArraySpliterator 上用于 {"Sample3", "Sample4", "Sample5"},这样做:

if ((a = array).length >= (hi = fence) &&
    (i = index) >= 0 && i < (index = hi)) {
    do { action.accept((T)a[i]); } while (++i < hi);
}

忽略所有 hifence 的东西,只有在数组处理被拆分为并行流时才使用,这是一个简单的 for 循环,它传递每个takeWhile 操作的元素,但从不检查它是否被取消。因此,它将在停止之前急切地遍历 "substream" 中的所有元素,甚至可能 .

无论我怎么看,这 都是 一个错误 - 感谢 Holger 的评论。我不想把这个答案放在这里(说真的!),但是答案的 none 清楚地指出这是一个错误。

人们说这与 ordered/un-ordered 有关,但事实并非如此,因为这将报告 true 3 次:

Stream<String[]> s1 = Arrays.stream(strArray);
System.out.println(s1.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s2 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream));
System.out.println(s2.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s3 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream))
            .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
System.out.println(s3.spliterator().hasCharacteristics(Spliterator.ORDERED));

如果你把它改成:

也很有趣
String[][] strArray = { 
         { "Sample1", "Sample2" }, 
         { "Sample3", "Sample5", "Sample4" }, // Sample4 is the last one here
         { "Sample7", "Sample8" } 
};

那么 Sample7Sample8 将不会是输出的一部分,否则它们会。 flatmap 似乎忽略了 将由 dropWhile 引入的取消标志。