连接并行流

Concatenating parallel streams

假设我有两个 int[] 数组 input1input2。我只想从第一个中获取正数,从第二个中获取不同的数字,将它们合并在一起,排序并存储到结果数组中。这可以使用流来执行:

int[] result = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                   Arrays.stream(input2).distinct()).sorted().toArray();

我想加速任务,所以我考虑让流并行。通常这只是意味着我可以在流构造和终端操作之间的任何位置插入 .parallel(),结果将是相同的。 IntStream.concat 的 JavaDoc 表示如果任何输入流是并行的,则结果流将是并行的。所以我认为制作 parallel() input1 流或 input2 流或串联流将产生相同的结果。

实际上我错了:如果我将 .parallel() 添加到结果流中,输入流似乎保持顺序。此外,我可以将输入流(其中之一或两者)标记为 .parallel(),然后将结果流变为 .sequential(),但输入保持并行。所以实际上有 8 种可能:input1、input2 和 concatenated stream 中的任何一个可以并行或不并行:

int[] sss = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sorted().toArray();
int[] ssp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
                Arrays.stream(input2).distinct()).parallel().sorted().toArray();
int[] sps = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] spp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                Arrays.stream(input2).parallel().distinct()).sorted().toArray();
int[] pss = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sequential().sorted().toArray();
int[] psp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sorted().toArray();
int[] pps = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] ppp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).parallel().distinct()).sorted().toArray();

benchmarked 不同输入大小的所有版本(在 Core i5 4xCPU、Win7 上使用 JDK 8u45 64 位)并在每种情况下得到不同的结果:

Benchmark           (n)  Mode  Cnt       Score       Error  Units
ConcatTest.SSS      100  avgt   20       7.094 ±     0.069  us/op
ConcatTest.SSS    10000  avgt   20    1542.820 ±    22.194  us/op
ConcatTest.SSS  1000000  avgt   20  350173.723 ±  7140.406  us/op
ConcatTest.SSP      100  avgt   20       6.176 ±     0.043  us/op
ConcatTest.SSP    10000  avgt   20     907.855 ±     8.448  us/op
ConcatTest.SSP  1000000  avgt   20  264193.679 ±  6744.169  us/op
ConcatTest.SPS      100  avgt   20      16.548 ±     0.175  us/op
ConcatTest.SPS    10000  avgt   20    1831.569 ±    13.582  us/op
ConcatTest.SPS  1000000  avgt   20  500736.204 ± 37932.197  us/op
ConcatTest.SPP      100  avgt   20      23.871 ±     0.285  us/op
ConcatTest.SPP    10000  avgt   20    1141.273 ±     9.310  us/op
ConcatTest.SPP  1000000  avgt   20  400582.847 ± 27330.492  us/op
ConcatTest.PSS      100  avgt   20       7.162 ±     0.241  us/op
ConcatTest.PSS    10000  avgt   20    1593.332 ±     7.961  us/op
ConcatTest.PSS  1000000  avgt   20  383920.286 ±  6650.890  us/op
ConcatTest.PSP      100  avgt   20       9.877 ±     0.382  us/op
ConcatTest.PSP    10000  avgt   20     883.639 ±    13.596  us/op
ConcatTest.PSP  1000000  avgt   20  257921.422 ±  7649.434  us/op
ConcatTest.PPS      100  avgt   20      16.412 ±     0.129  us/op
ConcatTest.PPS    10000  avgt   20    1816.782 ±    10.875  us/op
ConcatTest.PPS  1000000  avgt   20  476311.713 ± 19154.558  us/op
ConcatTest.PPP      100  avgt   20      23.078 ±     0.622  us/op
ConcatTest.PPP    10000  avgt   20    1128.889 ±     7.964  us/op
ConcatTest.PPP  1000000  avgt   20  393699.222 ± 56397.445  us/op

根据这些结果,我只能得出结论,distinct() 步骤的并行化会降低整体性能(至少在我的测试中如此)。

所以我有以下问题:

  1. 关于如何更好地使用串联流的并行化,是否有任何官方指南?测试所有可能的组合并不总是可行的(特别是当连接两个以上的流时),所以有一些 "rules of thumb" 会很好。
  2. 似乎如果我连接直接从 collection/array 创建的流(在连接之前没有执行中间操作),那么结果不太依赖于 parallel() 的位置。这是真的吗?
  3. 除了串联之外,是否还有其他情况的结果取决于流管道并行化的时间点?

该规范准确地描述了您得到的内容 — 当您考虑到这一点时,与其他操作不同,我们谈论的不是单个管道,而是三个不同的 Streams,它们保留其属性独立于其他操作。

规范说:“如果任一输入流是并行的,则结果流是[...]并行的。”这就是你得到的;如果 input 流是并行的,则 resulting 流是并行的(但您可以随后将其转换为顺序流)。但是将 resulting 流更改为并行或顺序流不会改变 input 流的性质,也不会将并行流和顺序流馈送到 concat.

关于性能后果,请参阅 documentation, paragraph “Stream operations and pipelines”:

Intermediate operations are further divided into stateless and stateful operations. Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such as distinct and sorted, may incorporate state from previously seen elements when processing new elements.

Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering.

您选择了两个名为 stateful 的操作并将它们组合在一起。因此,结果流的 .sorted() 操作需要对整个内容进行缓冲,然后才能开始排序,这意味着 distinct 操作已完成。不同的操作显然很难并行化,因为线程必须同步已经看到的值。

所以要回答你的第一个问题,这与 concat 无关,而只是 distinct 无法从并行执行中受益。

这也会使您的第二个问题过时,因为您在两个串联的流中执行完全不同的操作,因此您不能对预串联的 collection/array 执行相同的操作。在生成的数组上连接数组和 运行 distinct 不太可能产生更好的结果。

关于你的第三个问题,flatMap 关于 parallel 流的行为可能是一个惊喜的来源......