来自 HashSet 的并行流不会 运行 并行
Parallel stream from a HashSet doesn't run in parallel
我有一组要并行处理的元素。当我使用 List
时,并行性有效。但是,当我使用 Set
时,它不会并行 运行。
我写了一个显示问题的代码示例:
public static void main(String[] args) {
ParallelTest test = new ParallelTest();
List<Integer> list = Arrays.asList(1,2);
Set<Integer> set = new HashSet<>(list);
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
System.out.println("set print");
try {
forkJoinPool.submit(() ->
set.parallelStream().forEach(test::print)
).get();
} catch (Exception e) {
return;
}
System.out.println("\n\nlist print");
try {
forkJoinPool.submit(() ->
list.parallelStream().forEach(test::print)
).get();
} catch (Exception e) {
return;
}
}
private void print(int i){
System.out.println("start: " + i);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("end: " + i);
}
这是我在 windows 7
上得到的输出
set print
start: 1
end: 1
start: 2
end: 2
list print
start: 2
start: 1
end: 1
end: 2
我们可以看到 Set
中的第一个元素必须在处理第二个元素之前完成。对于 List
,第二个元素在第一个元素完成之前开始。
你能告诉我是什么原因导致了这个问题,以及如何使用 Set
集合来避免它吗?
我可以重现您看到的行为,其中并行度与您指定的 fork-join 池并行度的并行度不匹配。将 fork-join 池并行度设置为 10 并将集合中的元素数量增加到 50 后,我看到基于列表的流的并行度仅上升到 6,而基于集合的流的并行度从未超过2.
但是请注意,这种将任务提交到分叉连接池以 运行 该池中的并行流的技术是一种实现 "trick" 而 不是保证 正常工作。实际上,用于执行并行流的线程或线程池是未指定。默认情况下,使用通用的 fork-join 池,但在不同的环境中,最终可能会使用不同的线程池。 (考虑应用服务器中的容器。)
在 java.util.stream.AbstractTask class 中,LEAF_TARGET
字段决定了完成的拆分量,进而决定了可以实现的并行度。该字段的值基于 ForkJoinPool.getCommonPoolParallelism()
,它当然使用公共池的并行性,而不是恰好 运行 执行任务的任何池。
可以说这是一个错误(参见 OpenJDK 问题 JDK-8190974),但是,无论如何,这整个区域都未指定。然而,系统的这个区域肯定需要开发,例如在拆分策略、可用并行度、处理阻塞任务等方面。 JDK 的未来版本可能会解决其中的一些问题。
同时,可以通过使用系统属性来控制公共fork-join 池的并行度。如果将此行添加到程序中,
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");
而你 运行 公共池中的流(或者如果你将它们提交到你自己的具有足够高的并行度集的池)你会发现更多的任务是 运行 并行.
您还可以使用 -D
选项在命令行上设置此 属性。
再次强调,这不是保证的行为,将来可能会改变。但是在可预见的将来,这种技术可能会用于 JDK 8 种实现。
更新 2019-06-12: 错误 JDK-8190974 已在 JDK 10 中修复,并且该修复已被移植到即将推出的 JDK 8u 版本 (8u222).
我有一组要并行处理的元素。当我使用 List
时,并行性有效。但是,当我使用 Set
时,它不会并行 运行。
我写了一个显示问题的代码示例:
public static void main(String[] args) {
ParallelTest test = new ParallelTest();
List<Integer> list = Arrays.asList(1,2);
Set<Integer> set = new HashSet<>(list);
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
System.out.println("set print");
try {
forkJoinPool.submit(() ->
set.parallelStream().forEach(test::print)
).get();
} catch (Exception e) {
return;
}
System.out.println("\n\nlist print");
try {
forkJoinPool.submit(() ->
list.parallelStream().forEach(test::print)
).get();
} catch (Exception e) {
return;
}
}
private void print(int i){
System.out.println("start: " + i);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("end: " + i);
}
这是我在 windows 7
上得到的输出set print
start: 1
end: 1
start: 2
end: 2
list print
start: 2
start: 1
end: 1
end: 2
我们可以看到 Set
中的第一个元素必须在处理第二个元素之前完成。对于 List
,第二个元素在第一个元素完成之前开始。
你能告诉我是什么原因导致了这个问题,以及如何使用 Set
集合来避免它吗?
我可以重现您看到的行为,其中并行度与您指定的 fork-join 池并行度的并行度不匹配。将 fork-join 池并行度设置为 10 并将集合中的元素数量增加到 50 后,我看到基于列表的流的并行度仅上升到 6,而基于集合的流的并行度从未超过2.
但是请注意,这种将任务提交到分叉连接池以 运行 该池中的并行流的技术是一种实现 "trick" 而 不是保证 正常工作。实际上,用于执行并行流的线程或线程池是未指定。默认情况下,使用通用的 fork-join 池,但在不同的环境中,最终可能会使用不同的线程池。 (考虑应用服务器中的容器。)
在 java.util.stream.AbstractTask class 中,LEAF_TARGET
字段决定了完成的拆分量,进而决定了可以实现的并行度。该字段的值基于 ForkJoinPool.getCommonPoolParallelism()
,它当然使用公共池的并行性,而不是恰好 运行 执行任务的任何池。
可以说这是一个错误(参见 OpenJDK 问题 JDK-8190974),但是,无论如何,这整个区域都未指定。然而,系统的这个区域肯定需要开发,例如在拆分策略、可用并行度、处理阻塞任务等方面。 JDK 的未来版本可能会解决其中的一些问题。
同时,可以通过使用系统属性来控制公共fork-join 池的并行度。如果将此行添加到程序中,
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");
而你 运行 公共池中的流(或者如果你将它们提交到你自己的具有足够高的并行度集的池)你会发现更多的任务是 运行 并行.
您还可以使用 -D
选项在命令行上设置此 属性。
再次强调,这不是保证的行为,将来可能会改变。但是在可预见的将来,这种技术可能会用于 JDK 8 种实现。
更新 2019-06-12: 错误 JDK-8190974 已在 JDK 10 中修复,并且该修复已被移植到即将推出的 JDK 8u 版本 (8u222).