Stream.findAny是短路操作吗?
Is Stream.findAny a short-circuit operation?
考虑这段代码
Object found = collection.stream()
.filter( s -> myPredicate1(s))
.filter( s -> myPredicate2(s))
.findAny()
它会处理整个流,并为集合的所有元素同时调用 myPredicate1
和 myPredicate2
吗?或者是否会调用尽可能多的谓词来实际找到值?
是的,正如 Stream.findAny()
文档所述:
This is a short-circuiting terminal operation.
流中的对象 "pushed" 是消费操作,这是一种常见的误解。实际上是相反的——消费操作拉取每个元素。
对于顺序流,只会调用与查找匹配值所需的谓词一样多的谓词。
并行流可能会执行更多的谓词,但也会在找到元素后立即停止执行。
public class StreamFilterLazyTest {
static int stI = 0;
static class T {
public T() {
super();
this.i = ++stI;
}
int i;
int getI() {
System.err.println("getI: "+i);
return i;
}
}
public static void main(String[] args) {
T[] arr = {new T(), new T(), new T(), new T(), new T(), new T(), new T(), new T(), new T(), new T()};
Optional<T> found = Arrays.stream(arr).filter(t -> t.getI() == 3).findAny();
System.out.println("Found: "+found.get().getI());
}
}
将打印:
getI: 1
getI: 2
getI: 3
Found: 3
好吧,使用顺序流或并行流并不重要,它们仍将遍历 尽可能多的元素 以找到第一个匹配的元素。如果您使用 findFirst
并且您有一个由有序集合组成的 Stream,它可能会有所不同。
findFirst
在这种情况下必须保留顺序。
在这种情况下,由于并行性,第二个,然后是第三个元素可能会在第一个之前被处理,但仍然只会返回第一个。
findAny()
的 javadoc 状态:
"This is a short-circuiting terminal operation."
"The behavior of this operation is explicitly nondeterministic; it is free to select any element in the stream. This is to allow for maximal performance in parallel operations ..."
这意味着顺序流上的 findAny()
只会“拉出”足够的元素来找到第一个元素。在并行流上,它可以拉取足够多的内容,具体取决于实现。
软件包 javadoc 还声明:
"Intermediate operations return a new stream. They are always lazy; executing an intermediate operation such as filter()
does not actually perform any filtering, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate. Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed."
这意味着 filter()
谓词仅在 findAny()
终端拉取它们时出现。
简而言之:
Q: Is filter + findAny still a short-circuit operation?
答:是的。
Stream#findAny is a short-circuiting terminal operation. it will visit Predicate
s to matching & short-circuited one by one since Stream#filter return 每次一个新流。
Intermediate operations return a new stream. They are always lazy; executing an intermediate operation such as filter() does not actually perform any filtering, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate. Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed.
正如@Holger 在评论中提到的,它可以使过滤器短路,如下所示:
if(predicate1.test(value) && predicate2.test(value)){
....
}
测试
Iterator<Predicate<Integer>> predicates = Stream.<Predicate<Integer>>of(
it -> false,
it -> {
throw new AssertionError("Can't be short-circuited!");
}
).iterator();
Predicate<Integer> expectsToBeShortCircuited = it -> predicates.next().test(it);
Stream.of(1).filter(expectsToBeShortCircuited).filter(expectsToBeShortCircuited)
// |
// |
// here is short-circuited since the stream is empty now
.findAny();
你可以使用peek
来验证这个
== Sequential ==
Alpha1 Alpha2 Beta1 Beta2 Gamma1 Gamma2 Dolphin1 Fargo1 Fargo2 Found:
Fargo Applications: 9
== Parallel ==
Arnold1 Jim1 Loke1 Alpha1 Mustard1 Lenny1 Mustard2 Mark1 Alpha2 Mark2
Beta1 Beta2 Gamma1 Fargo1 Gamma2 Dolphin1 Fargo2 Found: Fargo
Applications: 17
YMMV 取决于核心数等
以下出品
package test.test;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class Snippet {
static AtomicInteger predicateApplications;
public static void main(String arr[]) {
System.out.println("== Sequential == \n");
sequential();
System.out.println(" == Parallel == \n");
parallel();
}
private static void sequential() {
Stream<String> stream = Stream.of("Alpha", "Beta", "Gamma", "Dolphin", "Fargo", "Mustard", "Lenny", "Mark",
"Jim", "Arnold", "Loke");
execute(stream);
}
private static void parallel() {
Stream<String> parallelStream = Stream
.of("Alpha", "Beta", "Gamma", "Dolphin", "Fargo", "Mustard", "Lenny", "Mark", "Jim", "Arnold", "Loke")
.parallel();
execute(parallelStream);
}
private static void execute(Stream<String> stream) {
predicateApplications = new AtomicInteger(0);
Optional<String> findAny = stream.peek(s -> print(s + "1")).filter(s -> s.contains("a"))
.peek(s -> print(s + "2")).filter(s -> s.startsWith("F")).findAny();
String found = findAny.orElse("NONE");
System.out.println("\nFound: " + found);
System.out.println("Applications: " + predicateApplications.get());
}
private static void print(String s) {
System.out.print(s + " ");
predicateApplications.incrementAndGet();
}
}
考虑这段代码
Object found = collection.stream()
.filter( s -> myPredicate1(s))
.filter( s -> myPredicate2(s))
.findAny()
它会处理整个流,并为集合的所有元素同时调用 myPredicate1
和 myPredicate2
吗?或者是否会调用尽可能多的谓词来实际找到值?
是的,正如 Stream.findAny()
文档所述:
This is a short-circuiting terminal operation.
流中的对象 "pushed" 是消费操作,这是一种常见的误解。实际上是相反的——消费操作拉取每个元素。
对于顺序流,只会调用与查找匹配值所需的谓词一样多的谓词。 并行流可能会执行更多的谓词,但也会在找到元素后立即停止执行。
public class StreamFilterLazyTest {
static int stI = 0;
static class T {
public T() {
super();
this.i = ++stI;
}
int i;
int getI() {
System.err.println("getI: "+i);
return i;
}
}
public static void main(String[] args) {
T[] arr = {new T(), new T(), new T(), new T(), new T(), new T(), new T(), new T(), new T(), new T()};
Optional<T> found = Arrays.stream(arr).filter(t -> t.getI() == 3).findAny();
System.out.println("Found: "+found.get().getI());
}
}
将打印:
getI: 1
getI: 2
getI: 3
Found: 3
好吧,使用顺序流或并行流并不重要,它们仍将遍历 尽可能多的元素 以找到第一个匹配的元素。如果您使用 findFirst
并且您有一个由有序集合组成的 Stream,它可能会有所不同。
findFirst
在这种情况下必须保留顺序。
在这种情况下,由于并行性,第二个,然后是第三个元素可能会在第一个之前被处理,但仍然只会返回第一个。
findAny()
的 javadoc 状态:
"This is a short-circuiting terminal operation."
"The behavior of this operation is explicitly nondeterministic; it is free to select any element in the stream. This is to allow for maximal performance in parallel operations ..."
这意味着顺序流上的 findAny()
只会“拉出”足够的元素来找到第一个元素。在并行流上,它可以拉取足够多的内容,具体取决于实现。
软件包 javadoc 还声明:
"Intermediate operations return a new stream. They are always lazy; executing an intermediate operation such as
filter()
does not actually perform any filtering, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate. Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed."
这意味着 filter()
谓词仅在 findAny()
终端拉取它们时出现。
简而言之:
Q: Is filter + findAny still a short-circuit operation?
答:是的。
Stream#findAny is a short-circuiting terminal operation. it will visit Predicate
s to matching & short-circuited one by one since Stream#filter return 每次一个新流。
Intermediate operations return a new stream. They are always lazy; executing an intermediate operation such as filter() does not actually perform any filtering, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate. Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed.
正如@Holger 在评论中提到的,它可以使过滤器短路,如下所示:
if(predicate1.test(value) && predicate2.test(value)){
....
}
测试
Iterator<Predicate<Integer>> predicates = Stream.<Predicate<Integer>>of(
it -> false,
it -> {
throw new AssertionError("Can't be short-circuited!");
}
).iterator();
Predicate<Integer> expectsToBeShortCircuited = it -> predicates.next().test(it);
Stream.of(1).filter(expectsToBeShortCircuited).filter(expectsToBeShortCircuited)
// |
// |
// here is short-circuited since the stream is empty now
.findAny();
你可以使用peek
来验证这个
== Sequential ==
Alpha1 Alpha2 Beta1 Beta2 Gamma1 Gamma2 Dolphin1 Fargo1 Fargo2 Found: Fargo Applications: 9
== Parallel ==
Arnold1 Jim1 Loke1 Alpha1 Mustard1 Lenny1 Mustard2 Mark1 Alpha2 Mark2 Beta1 Beta2 Gamma1 Fargo1 Gamma2 Dolphin1 Fargo2 Found: Fargo Applications: 17
YMMV 取决于核心数等
以下出品
package test.test;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class Snippet {
static AtomicInteger predicateApplications;
public static void main(String arr[]) {
System.out.println("== Sequential == \n");
sequential();
System.out.println(" == Parallel == \n");
parallel();
}
private static void sequential() {
Stream<String> stream = Stream.of("Alpha", "Beta", "Gamma", "Dolphin", "Fargo", "Mustard", "Lenny", "Mark",
"Jim", "Arnold", "Loke");
execute(stream);
}
private static void parallel() {
Stream<String> parallelStream = Stream
.of("Alpha", "Beta", "Gamma", "Dolphin", "Fargo", "Mustard", "Lenny", "Mark", "Jim", "Arnold", "Loke")
.parallel();
execute(parallelStream);
}
private static void execute(Stream<String> stream) {
predicateApplications = new AtomicInteger(0);
Optional<String> findAny = stream.peek(s -> print(s + "1")).filter(s -> s.contains("a"))
.peek(s -> print(s + "2")).filter(s -> s.startsWith("F")).findAny();
String found = findAny.orElse("NONE");
System.out.println("\nFound: " + found);
System.out.println("Applications: " + predicateApplications.get());
}
private static void print(String s) {
System.out.print(s + " ");
predicateApplications.incrementAndGet();
}
}