takeWhile() 与 flatmap 的工作方式不同
takeWhile() working differently with flatmap
我正在使用 takeWhile 创建片段以探索其可能性。与 flatMap 结合使用时,行为不符合预期。请在下面找到代码片段。
String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};
Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
.forEach(ele -> System.out.println(ele));
实际输出:
Sample1
Sample2
Sample3
Sample5
预期输出:
Sample1
Sample2
Sample3
预期的原因是 takeWhile 应该一直执行到内部条件变为真为止。我还在 flatmap 中添加了打印输出语句以进行调试。流仅返回两次,符合预期。
但是,如果链中没有平面图,这也能正常工作。
String[] strArraySingle = {"Sample3", "Sample4", "Sample5"};
Arrays.stream(strArraySingle)
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
.forEach(ele -> System.out.println(ele));
实际输出:
Sample3
此处实际输出与预期输出相符。
免责声明:这些片段仅用于代码练习,不提供任何有效用例。
更新:
错误 JDK-8193856:修复将作为 JDK 10 的一部分提供。
更改将更正 whileOps
Sink::accept
@Override
public void accept(T t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}
更改实施:
@Override
public void accept(T t) {
if (take && (take = predicate.test(t))) {
downstream.accept(t);
}
}
原因是使用了 flatMap
operation also being an intermediate operations with which (one of) the stateful short-circuiting intermediate operation takeWhile
。
Holger 在 中指出的 flatMap
的行为当然是一个参考,不应错过以了解此类短路操作的意外输出。
您可以通过引入终端操作来拆分这两个中间操作来实现您的预期结果,以进一步确定性地使用有序流并针对示例执行它们:
List<String> sampleList = Arrays.stream(strArray).flatMap(Arrays::stream).collect(Collectors.toList());
sampleList.stream().takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
.forEach(System.out::println);
另外,似乎有一个相关的 Bug#JDK-8075939 来追踪这个已经注册的行为。
编辑:这可以在 JDK-8193856 处进一步跟踪,接受为错误。
如果你看the documentation for takeWhile
:
if this stream is ordered, [returns] a stream consisting of the
longest prefix of elements taken from this stream that match the given
predicate.
if this stream is unordered, [returns] a stream consisting of a subset
of elements taken from this stream that match the given predicate.
您的流被巧合地订购了,但 takeWhile
不知道 它是。因此,它返回第二个条件 - 子集。你的 takeWhile
表现得像 filter
。
如果您在 takeWhile
之前添加对 sorted
的调用,您将看到预期的结果:
Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.sorted()
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
.forEach(ele -> System.out.println(ele));
这是 JDK 9 中的错误 - 来自 issue #8193856:
takeWhile
is incorrectly assuming that an upstream operation supports and honors cancellation, which unfortunately is not the case for flatMap
.
说明
如果流是有序的,takeWhile
应该显示预期的行为。这在您的代码中并非完全如此,因为您使用 forEach
,它放弃了顺序。如果您关心它,就像您在本例中所做的那样,您应该改用 forEachOrdered
。有趣的是:这并没有改变任何东西。
所以也许流首先没有被排序? (在那种情况下 。)如果你为从 strArray
创建的流创建一个临时变量,并通过在断点处执行表达式 ((StatefulOp) stream).isOrdered();
检查它是否有序,你会发现它确实被订购了:
String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};
Stream<String> stream = Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
// breakpoint here
System.out.println(stream);
这意味着这很可能是一个实现错误。
进入代码
正如其他人所怀疑的那样,我现在也认为这 可能 与 flatMap
渴望有关。更准确地说,这两个问题可能有相同的根本原因。
查看WhileOps
的源码,我们可以看到这些方法:
@Override
public void accept(T t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
takeWhile
使用此代码检查给定流元素 t
是否满足 predicate
:
- 如果是,它将元素传递给
downstream
操作,在本例中为 System.out::println
。
- 如果不是,它将
take
设置为 false,因此当下次询问是否应取消管道时(即完成),它 returns true
。
这涵盖了 takeWhile
操作。您需要知道的另一件事是 forEachOrdered
导致终端操作执行方法 ReferencePipeline::forEachWithCancel
:
@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
boolean cancelled;
do { } while (
!(cancelled = sink.cancellationRequested())
&& spliterator.tryAdvance(sink));
return cancelled;
}
所有这一切都是:
- 检查管道是否被取消
- 如果不是,则将接收器提前一个元素
- 如果这是最后一个元素则停止
看起来很有前途,对吧?
没有flatMap
在 "good case" 中(没有 flatMap
;你的第二个例子)forEachWithCancel
直接在 WhileOp
上作为 sink
操作,你可以看到这是如何播放:
ReferencePipeline::forEachWithCancel
执行循环:
WhileOps::accept
被赋予每个流元素
WhileOps::cancellationRequested
在每个元素后查询
- 在某些时候
"Sample4"
谓词失败并且流被取消
耶!
和flatMap
在 "bad case"(使用 flatMap
;您的第一个示例)中,forEachWithCancel
对 flatMap
操作进行操作,不过,它只是调用 forEachRemaining
在 ArraySpliterator
上用于 {"Sample3", "Sample4", "Sample5"}
,这样做:
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
do { action.accept((T)a[i]); } while (++i < hi);
}
忽略所有 hi
和 fence
的东西,只有在数组处理被拆分为并行流时才使用,这是一个简单的 for
循环,它传递每个takeWhile
操作的元素,但从不检查它是否被取消。因此,它将在停止之前急切地遍历 "substream" 中的所有元素,甚至可能 .
无论我怎么看,这 都是 一个错误 - 感谢 Holger 的评论。我不想把这个答案放在这里(说真的!),但是答案的 none 清楚地指出这是一个错误。
人们说这与 ordered/un-ordered 有关,但事实并非如此,因为这将报告 true
3 次:
Stream<String[]> s1 = Arrays.stream(strArray);
System.out.println(s1.spliterator().hasCharacteristics(Spliterator.ORDERED));
Stream<String> s2 = Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream));
System.out.println(s2.spliterator().hasCharacteristics(Spliterator.ORDERED));
Stream<String> s3 = Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
System.out.println(s3.spliterator().hasCharacteristics(Spliterator.ORDERED));
如果你把它改成:
也很有趣
String[][] strArray = {
{ "Sample1", "Sample2" },
{ "Sample3", "Sample5", "Sample4" }, // Sample4 is the last one here
{ "Sample7", "Sample8" }
};
那么 Sample7
和 Sample8
将不会是输出的一部分,否则它们会。 flatmap
似乎忽略了 将由 dropWhile
引入的取消标志。
我正在使用 takeWhile 创建片段以探索其可能性。与 flatMap 结合使用时,行为不符合预期。请在下面找到代码片段。
String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};
Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
.forEach(ele -> System.out.println(ele));
实际输出:
Sample1
Sample2
Sample3
Sample5
预期输出:
Sample1
Sample2
Sample3
预期的原因是 takeWhile 应该一直执行到内部条件变为真为止。我还在 flatmap 中添加了打印输出语句以进行调试。流仅返回两次,符合预期。
但是,如果链中没有平面图,这也能正常工作。
String[] strArraySingle = {"Sample3", "Sample4", "Sample5"};
Arrays.stream(strArraySingle)
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
.forEach(ele -> System.out.println(ele));
实际输出:
Sample3
此处实际输出与预期输出相符。
免责声明:这些片段仅用于代码练习,不提供任何有效用例。
更新:
错误 JDK-8193856:修复将作为 JDK 10 的一部分提供。
更改将更正 whileOps
Sink::accept
@Override
public void accept(T t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}
更改实施:
@Override
public void accept(T t) {
if (take && (take = predicate.test(t))) {
downstream.accept(t);
}
}
原因是使用了 flatMap
operation also being an intermediate operations with which (one of) the stateful short-circuiting intermediate operation takeWhile
。
Holger 在 flatMap
的行为当然是一个参考,不应错过以了解此类短路操作的意外输出。
您可以通过引入终端操作来拆分这两个中间操作来实现您的预期结果,以进一步确定性地使用有序流并针对示例执行它们:
List<String> sampleList = Arrays.stream(strArray).flatMap(Arrays::stream).collect(Collectors.toList());
sampleList.stream().takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
.forEach(System.out::println);
另外,似乎有一个相关的 Bug#JDK-8075939 来追踪这个已经注册的行为。
编辑:这可以在 JDK-8193856 处进一步跟踪,接受为错误。
如果你看the documentation for takeWhile
:
if this stream is ordered, [returns] a stream consisting of the longest prefix of elements taken from this stream that match the given predicate.
if this stream is unordered, [returns] a stream consisting of a subset of elements taken from this stream that match the given predicate.
您的流被巧合地订购了,但 takeWhile
不知道 它是。因此,它返回第二个条件 - 子集。你的 takeWhile
表现得像 filter
。
如果您在 takeWhile
之前添加对 sorted
的调用,您将看到预期的结果:
Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.sorted()
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
.forEach(ele -> System.out.println(ele));
这是 JDK 9 中的错误 - 来自 issue #8193856:
takeWhile
is incorrectly assuming that an upstream operation supports and honors cancellation, which unfortunately is not the case forflatMap
.
说明
如果流是有序的,takeWhile
应该显示预期的行为。这在您的代码中并非完全如此,因为您使用 forEach
,它放弃了顺序。如果您关心它,就像您在本例中所做的那样,您应该改用 forEachOrdered
。有趣的是:这并没有改变任何东西。
所以也许流首先没有被排序? (在那种情况下 strArray
创建的流创建一个临时变量,并通过在断点处执行表达式 ((StatefulOp) stream).isOrdered();
检查它是否有序,你会发现它确实被订购了:
String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};
Stream<String> stream = Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
// breakpoint here
System.out.println(stream);
这意味着这很可能是一个实现错误。
进入代码
正如其他人所怀疑的那样,我现在也认为这 可能 与 flatMap
渴望有关。更准确地说,这两个问题可能有相同的根本原因。
查看WhileOps
的源码,我们可以看到这些方法:
@Override
public void accept(T t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
takeWhile
使用此代码检查给定流元素 t
是否满足 predicate
:
- 如果是,它将元素传递给
downstream
操作,在本例中为System.out::println
。 - 如果不是,它将
take
设置为 false,因此当下次询问是否应取消管道时(即完成),它 returnstrue
。
这涵盖了 takeWhile
操作。您需要知道的另一件事是 forEachOrdered
导致终端操作执行方法 ReferencePipeline::forEachWithCancel
:
@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
boolean cancelled;
do { } while (
!(cancelled = sink.cancellationRequested())
&& spliterator.tryAdvance(sink));
return cancelled;
}
所有这一切都是:
- 检查管道是否被取消
- 如果不是,则将接收器提前一个元素
- 如果这是最后一个元素则停止
看起来很有前途,对吧?
没有flatMap
在 "good case" 中(没有 flatMap
;你的第二个例子)forEachWithCancel
直接在 WhileOp
上作为 sink
操作,你可以看到这是如何播放:
ReferencePipeline::forEachWithCancel
执行循环:WhileOps::accept
被赋予每个流元素WhileOps::cancellationRequested
在每个元素后查询
- 在某些时候
"Sample4"
谓词失败并且流被取消
耶!
和flatMap
在 "bad case"(使用 flatMap
;您的第一个示例)中,forEachWithCancel
对 flatMap
操作进行操作,不过,它只是调用 forEachRemaining
在 ArraySpliterator
上用于 {"Sample3", "Sample4", "Sample5"}
,这样做:
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
do { action.accept((T)a[i]); } while (++i < hi);
}
忽略所有 hi
和 fence
的东西,只有在数组处理被拆分为并行流时才使用,这是一个简单的 for
循环,它传递每个takeWhile
操作的元素,但从不检查它是否被取消。因此,它将在停止之前急切地遍历 "substream" 中的所有元素,甚至可能
无论我怎么看,这 都是 一个错误 - 感谢 Holger 的评论。我不想把这个答案放在这里(说真的!),但是答案的 none 清楚地指出这是一个错误。
人们说这与 ordered/un-ordered 有关,但事实并非如此,因为这将报告 true
3 次:
Stream<String[]> s1 = Arrays.stream(strArray);
System.out.println(s1.spliterator().hasCharacteristics(Spliterator.ORDERED));
Stream<String> s2 = Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream));
System.out.println(s2.spliterator().hasCharacteristics(Spliterator.ORDERED));
Stream<String> s3 = Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
System.out.println(s3.spliterator().hasCharacteristics(Spliterator.ORDERED));
如果你把它改成:
也很有趣String[][] strArray = {
{ "Sample1", "Sample2" },
{ "Sample3", "Sample5", "Sample4" }, // Sample4 is the last one here
{ "Sample7", "Sample8" }
};
那么 Sample7
和 Sample8
将不会是输出的一部分,否则它们会。 flatmap
似乎忽略了 将由 dropWhile
引入的取消标志。