连接并行流
Concatenating parallel streams
假设我有两个 int[]
数组 input1
和 input2
。我只想从第一个中获取正数,从第二个中获取不同的数字,将它们合并在一起,排序并存储到结果数组中。这可以使用流来执行:
int[] result = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).distinct()).sorted().toArray();
我想加速任务,所以我考虑让流并行。通常这只是意味着我可以在流构造和终端操作之间的任何位置插入 .parallel()
,结果将是相同的。 IntStream.concat 的 JavaDoc 表示如果任何输入流是并行的,则结果流将是并行的。所以我认为制作 parallel()
input1
流或 input2
流或串联流将产生相同的结果。
实际上我错了:如果我将 .parallel()
添加到结果流中,输入流似乎保持顺序。此外,我可以将输入流(其中之一或两者)标记为 .parallel()
,然后将结果流变为 .sequential()
,但输入保持并行。所以实际上有 8 种可能:input1、input2 和 concatenated stream 中的任何一个可以并行或不并行:
int[] sss = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).distinct()).sorted().toArray();
int[] ssp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).distinct()).parallel().sorted().toArray();
int[] sps = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] spp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sorted().toArray();
int[] pss = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).distinct()).sequential().sorted().toArray();
int[] psp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).distinct()).sorted().toArray();
int[] pps = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] ppp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sorted().toArray();
我 benchmarked 不同输入大小的所有版本(在 Core i5 4xCPU、Win7 上使用 JDK 8u45 64 位)并在每种情况下得到不同的结果:
Benchmark (n) Mode Cnt Score Error Units
ConcatTest.SSS 100 avgt 20 7.094 ± 0.069 us/op
ConcatTest.SSS 10000 avgt 20 1542.820 ± 22.194 us/op
ConcatTest.SSS 1000000 avgt 20 350173.723 ± 7140.406 us/op
ConcatTest.SSP 100 avgt 20 6.176 ± 0.043 us/op
ConcatTest.SSP 10000 avgt 20 907.855 ± 8.448 us/op
ConcatTest.SSP 1000000 avgt 20 264193.679 ± 6744.169 us/op
ConcatTest.SPS 100 avgt 20 16.548 ± 0.175 us/op
ConcatTest.SPS 10000 avgt 20 1831.569 ± 13.582 us/op
ConcatTest.SPS 1000000 avgt 20 500736.204 ± 37932.197 us/op
ConcatTest.SPP 100 avgt 20 23.871 ± 0.285 us/op
ConcatTest.SPP 10000 avgt 20 1141.273 ± 9.310 us/op
ConcatTest.SPP 1000000 avgt 20 400582.847 ± 27330.492 us/op
ConcatTest.PSS 100 avgt 20 7.162 ± 0.241 us/op
ConcatTest.PSS 10000 avgt 20 1593.332 ± 7.961 us/op
ConcatTest.PSS 1000000 avgt 20 383920.286 ± 6650.890 us/op
ConcatTest.PSP 100 avgt 20 9.877 ± 0.382 us/op
ConcatTest.PSP 10000 avgt 20 883.639 ± 13.596 us/op
ConcatTest.PSP 1000000 avgt 20 257921.422 ± 7649.434 us/op
ConcatTest.PPS 100 avgt 20 16.412 ± 0.129 us/op
ConcatTest.PPS 10000 avgt 20 1816.782 ± 10.875 us/op
ConcatTest.PPS 1000000 avgt 20 476311.713 ± 19154.558 us/op
ConcatTest.PPP 100 avgt 20 23.078 ± 0.622 us/op
ConcatTest.PPP 10000 avgt 20 1128.889 ± 7.964 us/op
ConcatTest.PPP 1000000 avgt 20 393699.222 ± 56397.445 us/op
根据这些结果,我只能得出结论,distinct()
步骤的并行化会降低整体性能(至少在我的测试中如此)。
所以我有以下问题:
- 关于如何更好地使用串联流的并行化,是否有任何官方指南?测试所有可能的组合并不总是可行的(特别是当连接两个以上的流时),所以有一些 "rules of thumb" 会很好。
- 似乎如果我连接直接从 collection/array 创建的流(在连接之前没有执行中间操作),那么结果不太依赖于
parallel()
的位置。这是真的吗?
- 除了串联之外,是否还有其他情况的结果取决于流管道并行化的时间点?
该规范准确地描述了您得到的内容 — 当您考虑到这一点时,与其他操作不同,我们谈论的不是单个管道,而是三个不同的 Stream
s,它们保留其属性独立于其他操作。
规范说:“如果任一输入流是并行的,则结果流是[...]并行的。”这就是你得到的;如果 input 流是并行的,则 resulting 流是并行的(但您可以随后将其转换为顺序流)。但是将 resulting 流更改为并行或顺序流不会改变 input 流的性质,也不会将并行流和顺序流馈送到 concat
.
关于性能后果,请参阅 documentation, paragraph “Stream operations and pipelines”:
Intermediate operations are further divided into stateless and stateful operations. Stateless operations, such as filter
and map
, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such as distinct
and sorted
, may incorporate state from previously seen elements when processing new elements.
Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering.
您选择了两个名为 stateful 的操作并将它们组合在一起。因此,结果流的 .sorted()
操作需要对整个内容进行缓冲,然后才能开始排序,这意味着 distinct
操作已完成。不同的操作显然很难并行化,因为线程必须同步已经看到的值。
所以要回答你的第一个问题,这与 concat
无关,而只是 distinct
无法从并行执行中受益。
这也会使您的第二个问题过时,因为您在两个串联的流中执行完全不同的操作,因此您不能对预串联的 collection/array 执行相同的操作。在生成的数组上连接数组和 运行 distinct
不太可能产生更好的结果。
关于你的第三个问题,flatMap
关于 parallel
流的行为可能是一个惊喜的来源......
假设我有两个 int[]
数组 input1
和 input2
。我只想从第一个中获取正数,从第二个中获取不同的数字,将它们合并在一起,排序并存储到结果数组中。这可以使用流来执行:
int[] result = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).distinct()).sorted().toArray();
我想加速任务,所以我考虑让流并行。通常这只是意味着我可以在流构造和终端操作之间的任何位置插入 .parallel()
,结果将是相同的。 IntStream.concat 的 JavaDoc 表示如果任何输入流是并行的,则结果流将是并行的。所以我认为制作 parallel()
input1
流或 input2
流或串联流将产生相同的结果。
实际上我错了:如果我将 .parallel()
添加到结果流中,输入流似乎保持顺序。此外,我可以将输入流(其中之一或两者)标记为 .parallel()
,然后将结果流变为 .sequential()
,但输入保持并行。所以实际上有 8 种可能:input1、input2 和 concatenated stream 中的任何一个可以并行或不并行:
int[] sss = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).distinct()).sorted().toArray();
int[] ssp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).distinct()).parallel().sorted().toArray();
int[] sps = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] spp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sorted().toArray();
int[] pss = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).distinct()).sequential().sorted().toArray();
int[] psp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).distinct()).sorted().toArray();
int[] pps = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] ppp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sorted().toArray();
我 benchmarked 不同输入大小的所有版本(在 Core i5 4xCPU、Win7 上使用 JDK 8u45 64 位)并在每种情况下得到不同的结果:
Benchmark (n) Mode Cnt Score Error Units
ConcatTest.SSS 100 avgt 20 7.094 ± 0.069 us/op
ConcatTest.SSS 10000 avgt 20 1542.820 ± 22.194 us/op
ConcatTest.SSS 1000000 avgt 20 350173.723 ± 7140.406 us/op
ConcatTest.SSP 100 avgt 20 6.176 ± 0.043 us/op
ConcatTest.SSP 10000 avgt 20 907.855 ± 8.448 us/op
ConcatTest.SSP 1000000 avgt 20 264193.679 ± 6744.169 us/op
ConcatTest.SPS 100 avgt 20 16.548 ± 0.175 us/op
ConcatTest.SPS 10000 avgt 20 1831.569 ± 13.582 us/op
ConcatTest.SPS 1000000 avgt 20 500736.204 ± 37932.197 us/op
ConcatTest.SPP 100 avgt 20 23.871 ± 0.285 us/op
ConcatTest.SPP 10000 avgt 20 1141.273 ± 9.310 us/op
ConcatTest.SPP 1000000 avgt 20 400582.847 ± 27330.492 us/op
ConcatTest.PSS 100 avgt 20 7.162 ± 0.241 us/op
ConcatTest.PSS 10000 avgt 20 1593.332 ± 7.961 us/op
ConcatTest.PSS 1000000 avgt 20 383920.286 ± 6650.890 us/op
ConcatTest.PSP 100 avgt 20 9.877 ± 0.382 us/op
ConcatTest.PSP 10000 avgt 20 883.639 ± 13.596 us/op
ConcatTest.PSP 1000000 avgt 20 257921.422 ± 7649.434 us/op
ConcatTest.PPS 100 avgt 20 16.412 ± 0.129 us/op
ConcatTest.PPS 10000 avgt 20 1816.782 ± 10.875 us/op
ConcatTest.PPS 1000000 avgt 20 476311.713 ± 19154.558 us/op
ConcatTest.PPP 100 avgt 20 23.078 ± 0.622 us/op
ConcatTest.PPP 10000 avgt 20 1128.889 ± 7.964 us/op
ConcatTest.PPP 1000000 avgt 20 393699.222 ± 56397.445 us/op
根据这些结果,我只能得出结论,distinct()
步骤的并行化会降低整体性能(至少在我的测试中如此)。
所以我有以下问题:
- 关于如何更好地使用串联流的并行化,是否有任何官方指南?测试所有可能的组合并不总是可行的(特别是当连接两个以上的流时),所以有一些 "rules of thumb" 会很好。
- 似乎如果我连接直接从 collection/array 创建的流(在连接之前没有执行中间操作),那么结果不太依赖于
parallel()
的位置。这是真的吗? - 除了串联之外,是否还有其他情况的结果取决于流管道并行化的时间点?
该规范准确地描述了您得到的内容 — 当您考虑到这一点时,与其他操作不同,我们谈论的不是单个管道,而是三个不同的 Stream
s,它们保留其属性独立于其他操作。
规范说:“如果任一输入流是并行的,则结果流是[...]并行的。”这就是你得到的;如果 input 流是并行的,则 resulting 流是并行的(但您可以随后将其转换为顺序流)。但是将 resulting 流更改为并行或顺序流不会改变 input 流的性质,也不会将并行流和顺序流馈送到 concat
.
关于性能后果,请参阅 documentation, paragraph “Stream operations and pipelines”:
Intermediate operations are further divided into stateless and stateful operations. Stateless operations, such as
filter
andmap
, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such asdistinct
andsorted
, may incorporate state from previously seen elements when processing new elements.Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering.
您选择了两个名为 stateful 的操作并将它们组合在一起。因此,结果流的 .sorted()
操作需要对整个内容进行缓冲,然后才能开始排序,这意味着 distinct
操作已完成。不同的操作显然很难并行化,因为线程必须同步已经看到的值。
所以要回答你的第一个问题,这与 concat
无关,而只是 distinct
无法从并行执行中受益。
这也会使您的第二个问题过时,因为您在两个串联的流中执行完全不同的操作,因此您不能对预串联的 collection/array 执行相同的操作。在生成的数组上连接数组和 运行 distinct
不太可能产生更好的结果。
关于你的第三个问题,flatMap
关于 parallel
流的行为可能是一个惊喜的来源......