在 Java 8 中的自定义流方法中支持 lazy/late 并行的最佳方式
Best way to support lazy/late parallel in customised stream method in Java 8
Java 实现使用 ReferencePipeline
class,它在设计上支持 lazy/late .parallel()
。这意味着这些每组代码行完全相同:
// sequential identical lines:
stream.map(mapper).distict().filter(filter).sequential()...
stream.sequential().map(mapper).distict().filter(filter)...
// parallel identical lines:
stream.map(mapper).distict().filter(filter).parallel()...
stream.parallel().map(mapper).distict().filter(filter)...
假设我想使用以下签名构建一个新的自定义流方法:
public <T> static Stream<T> myMethod(Stream<T> stream)
根据要求,该方法应具有相同的 .parallel()
lazy/late 行为。这意味着每组行应该具有完全相同的行为:
// sequential identical lines:
myMethod(stream).sequential()...
myMethod(stream.sequential())...
// parallel identical lines:
myMethod(stream).parallel()...
myMethod(stream.parallel())...
我该怎么做?一个简单的例子会很有用。
public <T> static Stream<T> myMethod(Stream<T> stream) {
// Any implementation that changes the stream
// For simplification lets assume that this extensions
// switches the odd with even positions, or anything else
// that it's simpler and easy to demonstrate
// This prevents parallel to be lazy/late!
Spliterator<T> spliterator = stream.spliterator();
return StreamSupport.stream(new Spliterator<T>() {
// My easy implementation
}, stream.isParallel());
}
请注意,.spliterator()
和 StreamSupport.stream()
的使用对此处讨论的并行处理有影响:Understanding sequential vs parallel stream spliterators in Java 8 and Java 9
更新:使用 assertj 的 junit5 测试:
@ParameterizedTest(name="[{index}] {0}/{1}/{2} = {3} --> {4}")
@CsvSource({
"-,-,-,1!,Sequential: default behaviour",
"P,-,-,2+,Parallel: set in stage1",
"-,P,-,2+,Parallel: set in stage2",
"-,-,P,2+,Parallel: set in stage3",
"P,S,-,1!,Sequential: set in stage2",
"P,-,S,1!,Sequential: set in stage3",
"P,S,P,2+,Parallel: set last in stage3",
"S,P,S,1!,Sequential: set last in stage3",
})
void myMethodTest(String stage1f, String stage2f, String stage3f, String expThreads, String name) throws Exception {
Set<String> set1 = new ConcurrentSkipListSet<>();
Set<String> set2 = new ConcurrentSkipListSet<>();
Set<String> set3 = new ConcurrentSkipListSet<>();
int parallelism = 4;
int minExpected = expThreads.equals("1!") ? 1 : 2;
int maxExpected = expThreads.equals("1!") ? 1 : parallelism;
BiFunction<String, Stream<Long>, Stream<Long>> mode = (flag, stream) -> {
switch (flag) {
case "P": return stream.parallel();
case "S": return stream.sequential();
default: return stream;
}
};
Stream<Long> stage1 = mode.apply(stage1f, LongStream.range(0, 1000_000).boxed())
.peek(x -> set1.add(Thread.currentThread().getName()));
Stream<Long> stage2 = mode.apply(stage2f, myMethod(stage1).map(x -> 2*x))
.peek(x -> set2.add(Thread.currentThread().getName()));
Stream<Long> stage3 = mode.apply(stage3f, myMethod(stage2).map(x -> 2*x))
.peek(x -> set3.add(Thread.currentThread().getName()));
new ForkJoinPool(parallelism).submit(() -> {
List<Long> list = stage3.collect(Collectors.toList());
System.out.print("list:" + list.size() + " threads:" + parallelism + " flags:" + stage1f + "/" + stage2f + "/" + stage3f + " ");
}).get();
System.out.print("stage1:" + set1.size() + "/" + maxExpected + " ");
System.out.print("stage2:" + set2.size() + "/" + maxExpected + " ");
System.out.println("stage3:" + set3.size() + "/" + maxExpected + " ");
assertThat(set1.size()).isBetween(minExpected, maxExpected);
assertThat(set2.size()).isBetween(minExpected, maxExpected);
assertThat(set3.size()).isBetween(minExpected, maxExpected);
}
这很棘手,以下解决方案可能并不适用于所有情况:
public static <T> Stream<T> myMethod(Stream<T> stream) {
// for simplification, this does not alter anything from the source stream
class CustomSpliterator implements Spliterator<T> {
Spliterator<T> source;
CustomSpliterator(Spliterator<T> src) {
source = src;
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return source.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
source.forEachRemaining(action);
}
@Override public Spliterator<T> trySplit() {
Spliterator<T> sSp = source.trySplit();
return sSp == null? null: new CustomSpliterator(sSp);
}
@Override public long estimateSize() {
return source.estimateSize();
}
@Override public int characteristics() {
return source.characteristics();
}
}
class MySpSupplier implements Supplier<Spliterator<T>> {
Stream<T> downStream;
@Override public Spliterator<T> get() {
System.out.println("MySpSupplier.get(): nailing down behavior");
Stream<T> src=downStream.isParallel()? stream.parallel(): stream.sequential();
return new CustomSpliterator(src.spliterator());
}
}
MySpSupplier sup = new MySpSupplier();
Stream<T> s = StreamSupport.stream(sup, Spliterator.ORDERED, stream.isParallel());
sup.downStream = s;
return s;
}
但它实现了目标,例如
Stream<String> s = Stream.of("foo", "bar", "baz");
System.out.println("started with "+(s.isParallel()? "parallel": "sequential")+" stream");
s = s.peek(x -> System.out.println("upstream "+x+": "+Thread.currentThread()));
s = myMethod(s.map(String::toUpperCase));
System.out.println("chaining another ops");
s = s.peek(x -> System.out.println("downstream "+x+": "+Thread.currentThread()));
s = s.filter(str -> str.startsWith("B"));
System.out.println("turning to parallel");
s = s.parallel();
System.out.println("commencing terminal operation");
List<String> l = s.collect(Collectors.toList());
System.out.println("result: "+l);
started with sequential stream
chaining another ops
turning to parallel
commencing terminal operation
MySpSupplier.get(): nailing down behavior
upstream bar: Thread[main,5,main]
upstream foo: Thread[ForkJoinPool.commonPool-worker-1,5,main]
downstream BAR: Thread[main,5,main]
upstream baz: Thread[ForkJoinPool.commonPool-worker-2,5,main]
downstream FOO: Thread[ForkJoinPool.commonPool-worker-1,5,main]
downstream BAZ: Thread[ForkJoinPool.commonPool-worker-2,5,main]
result: [BAR, BAZ]
或
Stream<String> s = Stream.of("foo", "bar", "baz").parallel();
System.out.println("started with "+(s.isParallel()? "parallel": "sequential")+" stream");
s = s.peek(x -> System.out.println("upstream "+x+": "+Thread.currentThread()));
s = myMethod(s.map(String::toUpperCase));
System.out.println("chaining another ops");
s = s.peek(x -> System.out.println("downstream "+x+": "+Thread.currentThread()));
s = s.filter(str -> str.startsWith("B"));
System.out.println("turning to sequential");
s = s.sequential();
System.out.println("commencing terminal operation");
List<String> l = s.collect(Collectors.toList());
System.out.println("result: "+l);
started with parallel stream
chaining another ops
turning to sequential
commencing terminal operation
MySpSupplier.get(): nailing down behavior
upstream foo: Thread[main,5,main]
downstream FOO: Thread[main,5,main]
upstream bar: Thread[main,5,main]
downstream BAR: Thread[main,5,main]
upstream baz: Thread[main,5,main]
downstream BAZ: Thread[main,5,main]
result: [BAR, BAZ]
Java 实现使用 ReferencePipeline
class,它在设计上支持 lazy/late .parallel()
。这意味着这些每组代码行完全相同:
// sequential identical lines:
stream.map(mapper).distict().filter(filter).sequential()...
stream.sequential().map(mapper).distict().filter(filter)...
// parallel identical lines:
stream.map(mapper).distict().filter(filter).parallel()...
stream.parallel().map(mapper).distict().filter(filter)...
假设我想使用以下签名构建一个新的自定义流方法:
public <T> static Stream<T> myMethod(Stream<T> stream)
根据要求,该方法应具有相同的 .parallel()
lazy/late 行为。这意味着每组行应该具有完全相同的行为:
// sequential identical lines:
myMethod(stream).sequential()...
myMethod(stream.sequential())...
// parallel identical lines:
myMethod(stream).parallel()...
myMethod(stream.parallel())...
我该怎么做?一个简单的例子会很有用。
public <T> static Stream<T> myMethod(Stream<T> stream) {
// Any implementation that changes the stream
// For simplification lets assume that this extensions
// switches the odd with even positions, or anything else
// that it's simpler and easy to demonstrate
// This prevents parallel to be lazy/late!
Spliterator<T> spliterator = stream.spliterator();
return StreamSupport.stream(new Spliterator<T>() {
// My easy implementation
}, stream.isParallel());
}
请注意,.spliterator()
和 StreamSupport.stream()
的使用对此处讨论的并行处理有影响:Understanding sequential vs parallel stream spliterators in Java 8 and Java 9
更新:使用 assertj 的 junit5 测试:
@ParameterizedTest(name="[{index}] {0}/{1}/{2} = {3} --> {4}")
@CsvSource({
"-,-,-,1!,Sequential: default behaviour",
"P,-,-,2+,Parallel: set in stage1",
"-,P,-,2+,Parallel: set in stage2",
"-,-,P,2+,Parallel: set in stage3",
"P,S,-,1!,Sequential: set in stage2",
"P,-,S,1!,Sequential: set in stage3",
"P,S,P,2+,Parallel: set last in stage3",
"S,P,S,1!,Sequential: set last in stage3",
})
void myMethodTest(String stage1f, String stage2f, String stage3f, String expThreads, String name) throws Exception {
Set<String> set1 = new ConcurrentSkipListSet<>();
Set<String> set2 = new ConcurrentSkipListSet<>();
Set<String> set3 = new ConcurrentSkipListSet<>();
int parallelism = 4;
int minExpected = expThreads.equals("1!") ? 1 : 2;
int maxExpected = expThreads.equals("1!") ? 1 : parallelism;
BiFunction<String, Stream<Long>, Stream<Long>> mode = (flag, stream) -> {
switch (flag) {
case "P": return stream.parallel();
case "S": return stream.sequential();
default: return stream;
}
};
Stream<Long> stage1 = mode.apply(stage1f, LongStream.range(0, 1000_000).boxed())
.peek(x -> set1.add(Thread.currentThread().getName()));
Stream<Long> stage2 = mode.apply(stage2f, myMethod(stage1).map(x -> 2*x))
.peek(x -> set2.add(Thread.currentThread().getName()));
Stream<Long> stage3 = mode.apply(stage3f, myMethod(stage2).map(x -> 2*x))
.peek(x -> set3.add(Thread.currentThread().getName()));
new ForkJoinPool(parallelism).submit(() -> {
List<Long> list = stage3.collect(Collectors.toList());
System.out.print("list:" + list.size() + " threads:" + parallelism + " flags:" + stage1f + "/" + stage2f + "/" + stage3f + " ");
}).get();
System.out.print("stage1:" + set1.size() + "/" + maxExpected + " ");
System.out.print("stage2:" + set2.size() + "/" + maxExpected + " ");
System.out.println("stage3:" + set3.size() + "/" + maxExpected + " ");
assertThat(set1.size()).isBetween(minExpected, maxExpected);
assertThat(set2.size()).isBetween(minExpected, maxExpected);
assertThat(set3.size()).isBetween(minExpected, maxExpected);
}
这很棘手,以下解决方案可能并不适用于所有情况:
public static <T> Stream<T> myMethod(Stream<T> stream) {
// for simplification, this does not alter anything from the source stream
class CustomSpliterator implements Spliterator<T> {
Spliterator<T> source;
CustomSpliterator(Spliterator<T> src) {
source = src;
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return source.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
source.forEachRemaining(action);
}
@Override public Spliterator<T> trySplit() {
Spliterator<T> sSp = source.trySplit();
return sSp == null? null: new CustomSpliterator(sSp);
}
@Override public long estimateSize() {
return source.estimateSize();
}
@Override public int characteristics() {
return source.characteristics();
}
}
class MySpSupplier implements Supplier<Spliterator<T>> {
Stream<T> downStream;
@Override public Spliterator<T> get() {
System.out.println("MySpSupplier.get(): nailing down behavior");
Stream<T> src=downStream.isParallel()? stream.parallel(): stream.sequential();
return new CustomSpliterator(src.spliterator());
}
}
MySpSupplier sup = new MySpSupplier();
Stream<T> s = StreamSupport.stream(sup, Spliterator.ORDERED, stream.isParallel());
sup.downStream = s;
return s;
}
但它实现了目标,例如
Stream<String> s = Stream.of("foo", "bar", "baz");
System.out.println("started with "+(s.isParallel()? "parallel": "sequential")+" stream");
s = s.peek(x -> System.out.println("upstream "+x+": "+Thread.currentThread()));
s = myMethod(s.map(String::toUpperCase));
System.out.println("chaining another ops");
s = s.peek(x -> System.out.println("downstream "+x+": "+Thread.currentThread()));
s = s.filter(str -> str.startsWith("B"));
System.out.println("turning to parallel");
s = s.parallel();
System.out.println("commencing terminal operation");
List<String> l = s.collect(Collectors.toList());
System.out.println("result: "+l);
started with sequential stream
chaining another ops
turning to parallel
commencing terminal operation
MySpSupplier.get(): nailing down behavior
upstream bar: Thread[main,5,main]
upstream foo: Thread[ForkJoinPool.commonPool-worker-1,5,main]
downstream BAR: Thread[main,5,main]
upstream baz: Thread[ForkJoinPool.commonPool-worker-2,5,main]
downstream FOO: Thread[ForkJoinPool.commonPool-worker-1,5,main]
downstream BAZ: Thread[ForkJoinPool.commonPool-worker-2,5,main]
result: [BAR, BAZ]
或
Stream<String> s = Stream.of("foo", "bar", "baz").parallel();
System.out.println("started with "+(s.isParallel()? "parallel": "sequential")+" stream");
s = s.peek(x -> System.out.println("upstream "+x+": "+Thread.currentThread()));
s = myMethod(s.map(String::toUpperCase));
System.out.println("chaining another ops");
s = s.peek(x -> System.out.println("downstream "+x+": "+Thread.currentThread()));
s = s.filter(str -> str.startsWith("B"));
System.out.println("turning to sequential");
s = s.sequential();
System.out.println("commencing terminal operation");
List<String> l = s.collect(Collectors.toList());
System.out.println("result: "+l);
started with parallel stream
chaining another ops
turning to sequential
commencing terminal operation
MySpSupplier.get(): nailing down behavior
upstream foo: Thread[main,5,main]
downstream FOO: Thread[main,5,main]
upstream bar: Thread[main,5,main]
downstream BAR: Thread[main,5,main]
upstream baz: Thread[main,5,main]
downstream BAZ: Thread[main,5,main]
result: [BAR, BAZ]