并行转换流时如何使用收集器
How collectors are used when turning the stream in parallel
我实际上试图回答这个问题。所以我虽然这个收集器不能很好地并行工作:
private static Collector<String, ?, List<String>> oddLines() {
int[] counter = {1};
return Collector.of(ArrayList::new,
(l, line) -> {
if (counter[0] % 2 == 1) l.add(line);
counter[0]++;
},
(l1, l2) -> {
l1.addAll(l2);
return l1;
});
}
但它有效。
编辑:它实际上没有用;我被我的输入集太小而无法触发任何并行性这一事实所愚弄;请参阅评论中的讨论。
我认为这行不通,因为我想到了以下两个执行计划。
1。 counter
数组在所有线程之间共享。
线程t1读取了Stream的第一个元素,所以满足if条件。它将第一个元素添加到它的列表中。然后在他有时间更新数组值之前执行停止。
线程 t2,表示从流的第 4 个元素开始,将其添加到其列表中。所以我们最终得到了一个不需要的元素。
当然,既然这个收集器似乎可以工作,我想它不是那样工作的。无论如何更新都不是原子的。
2。每个线程都有自己的数组副本
在这种情况下,更新没有更多问题,但没有什么能阻止我线程 t2 不会从流的第 4 个元素开始。所以他也不是那样工作的。
所以看起来它根本就不是那样工作的,这让我想到了一个问题......收集器是如何并行使用的?
有人可以基本解释一下它是如何工作的,以及为什么我的收集器在 运行 并行时工作吗?
非常感谢!
将 parallel()
源流传递到您的收集器中足以破坏逻辑,因为您的共享状态 (counter
) 可能 从不同的任务中递增.您可以验证这一点,因为它永远不会为任何有限流输入返回正确的结果:
Stream<String> lines = IntStream.range(1, 20000).mapToObj(i -> i + "");
System.out.println(lines.isParallel());
lines = lines.parallel();
System.out.println(lines.isParallel());
List<String> collected = lines.collect(oddLines());
System.out.println(collected.size());
请注意,对于无限流(例如,从 Files.lines()
读取时),您需要 在流中生成大量数据 ,因此它实际上会分叉一个任务同时 运行 一些块。
我的输出是:
false
true
12386
这显然是错误的。
正如@Holger 在评论中正确指出的那样,当您的收集器指定 CONCURRENT
和 UNORDERED
时,可能会发生不同的竞争,在这种情况下,它们在单个共享集合上运行跨任务(ArrayList::new
每个流调用一次),其中 - 仅 parallel()
它将 运行 集合上的累加器 每个任务 然后稍后使用您定义的组合器组合结果。
如果您将特征添加到收集器,由于单个集合中的共享状态,您可能 运行 到以下结果中:
false
true
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 73
at java.util.ArrayList.add(ArrayList.java:459)
at de.jungblut.stuff.StreamPallel.lambda[=12=](StreamPallel.java:18)
at de.jungblut.stuff.StreamPallel$$Lambda/1044036744.accept(Unknown Source)
at java.util.stream.ReferencePipeline.lambda$collect7(ReferencePipeline.java:496)
at java.util.stream.ReferencePipeline$$Lambda/2003749087.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.IntPipeline.accept(IntPipeline.java:250)
at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
at de.jungblut.stuff.StreamPallel.main(StreamPallel.java:32)12386
其实这个收藏家的工作纯属巧合。它不适用于自定义数据源。考虑这个例子:
List<String> list = IntStream.range(0, 10).parallel().mapToObj(String::valueOf)
.collect(oddLines());
System.out.println(list);
这总是会产生不同的结果。真正的原因只是因为当 BufferedReader.lines()
流被至少 java.util.Spliterators.IteratorSpliterator.BATCH_UNIT
行分割时,即 1024。如果你有更多的行数,即使使用 BufferedReader
也可能会失败:
String data = IntStream.range(0, 10000).mapToObj(String::valueOf)
.collect(Collectors.joining("\n"));
List<String> list = new BufferedReader(new StringReader(data)).lines().parallel()
.collect(oddLines());
list.stream().mapToInt(Integer::parseInt).filter(x -> x%2 != 0)
.forEach(System.out::println);
如果收集器正常工作,这应该不会打印任何内容。但有时它会打印出来。
我实际上试图回答这个问题
private static Collector<String, ?, List<String>> oddLines() {
int[] counter = {1};
return Collector.of(ArrayList::new,
(l, line) -> {
if (counter[0] % 2 == 1) l.add(line);
counter[0]++;
},
(l1, l2) -> {
l1.addAll(l2);
return l1;
});
}
但它有效。
编辑:它实际上没有用;我被我的输入集太小而无法触发任何并行性这一事实所愚弄;请参阅评论中的讨论。
我认为这行不通,因为我想到了以下两个执行计划。
1。 counter
数组在所有线程之间共享。
线程t1读取了Stream的第一个元素,所以满足if条件。它将第一个元素添加到它的列表中。然后在他有时间更新数组值之前执行停止。
线程 t2,表示从流的第 4 个元素开始,将其添加到其列表中。所以我们最终得到了一个不需要的元素。
当然,既然这个收集器似乎可以工作,我想它不是那样工作的。无论如何更新都不是原子的。
2。每个线程都有自己的数组副本
在这种情况下,更新没有更多问题,但没有什么能阻止我线程 t2 不会从流的第 4 个元素开始。所以他也不是那样工作的。
所以看起来它根本就不是那样工作的,这让我想到了一个问题......收集器是如何并行使用的?
有人可以基本解释一下它是如何工作的,以及为什么我的收集器在 运行 并行时工作吗?
非常感谢!
将 parallel()
源流传递到您的收集器中足以破坏逻辑,因为您的共享状态 (counter
) 可能 从不同的任务中递增.您可以验证这一点,因为它永远不会为任何有限流输入返回正确的结果:
Stream<String> lines = IntStream.range(1, 20000).mapToObj(i -> i + "");
System.out.println(lines.isParallel());
lines = lines.parallel();
System.out.println(lines.isParallel());
List<String> collected = lines.collect(oddLines());
System.out.println(collected.size());
请注意,对于无限流(例如,从 Files.lines()
读取时),您需要 在流中生成大量数据 ,因此它实际上会分叉一个任务同时 运行 一些块。
我的输出是:
false
true
12386
这显然是错误的。
正如@Holger 在评论中正确指出的那样,当您的收集器指定 CONCURRENT
和 UNORDERED
时,可能会发生不同的竞争,在这种情况下,它们在单个共享集合上运行跨任务(ArrayList::new
每个流调用一次),其中 - 仅 parallel()
它将 运行 集合上的累加器 每个任务 然后稍后使用您定义的组合器组合结果。
如果您将特征添加到收集器,由于单个集合中的共享状态,您可能 运行 到以下结果中:
false
true
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 73
at java.util.ArrayList.add(ArrayList.java:459)
at de.jungblut.stuff.StreamPallel.lambda[=12=](StreamPallel.java:18)
at de.jungblut.stuff.StreamPallel$$Lambda/1044036744.accept(Unknown Source)
at java.util.stream.ReferencePipeline.lambda$collect7(ReferencePipeline.java:496)
at java.util.stream.ReferencePipeline$$Lambda/2003749087.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.IntPipeline.accept(IntPipeline.java:250)
at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
at de.jungblut.stuff.StreamPallel.main(StreamPallel.java:32)12386
其实这个收藏家的工作纯属巧合。它不适用于自定义数据源。考虑这个例子:
List<String> list = IntStream.range(0, 10).parallel().mapToObj(String::valueOf)
.collect(oddLines());
System.out.println(list);
这总是会产生不同的结果。真正的原因只是因为当 BufferedReader.lines()
流被至少 java.util.Spliterators.IteratorSpliterator.BATCH_UNIT
行分割时,即 1024。如果你有更多的行数,即使使用 BufferedReader
也可能会失败:
String data = IntStream.range(0, 10000).mapToObj(String::valueOf)
.collect(Collectors.joining("\n"));
List<String> list = new BufferedReader(new StringReader(data)).lines().parallel()
.collect(oddLines());
list.stream().mapToInt(Integer::parseInt).filter(x -> x%2 != 0)
.forEach(System.out::println);
如果收集器正常工作,这应该不会打印任何内容。但有时它会打印出来。