如果并行处理,为什么在无穷无尽的数字流中按素数过滤需要永远?
Why is filtering by primality in an inifinite stream of numbers taking forever if processed in parallel?
我正在创建从 2 亿开始的无限整数流,使用朴素素性测试实现过滤此流以生成负载并将结果限制为 10。
Predicate<Integer> isPrime = new Predicate<Integer>() {
@Override
public boolean test(Integer n) {
for (int i = 2; i < n; i++) {
if (n % i == 0) return false;
}
return true;
}
};
Stream.iterate(200_000_000, n -> ++n)
.filter(isPrime)
.limit(10)
.forEach(i -> System.out.print(i + " "));
这按预期工作。
现在,如果我在过滤之前添加对 parallel() 的调用,则不会生成任何内容并且处理不会完成。
Stream.iterate(200_000_000, n -> ++n)
.parallel()
.filter(isPrime)
.limit(10)
.forEach(i -> System.out.print(i + " "));
有人可以为我指出这里发生的事情的正确方向吗?
编辑:我不是在寻找更好的素数测试实现(它是一个很长的 运行 实现),而是为了解释使用并行流的负面影响。
parallel
流花费这么长时间的原因是因为所有并行流都使用 common fork-join thread pool
并且因为您正在提交一个很长的 运行 任务(因为您的实现isPrime
方法效率不高),您阻塞了池中的所有线程,因此所有其他使用并行流的任务都被阻塞。
为了使并行版本更快,您可以更有效地实施isPrime。例如
Predicate<Integer> isPrime = new Predicate<Integer>() {
@Override
public boolean test(Integer n) {
if(n < 2) return false;
if(n == 2 || n == 3) return true;
if(n%2 == 0 || n%3 == 0) return false;
long sqrtN = (long)Math.sqrt(n)+1;
for(long i = 6L; i <= sqrtN; i += 6) {
if(n%(i-1) == 0 || n%(i+1) == 0) return false;
}
return true;
}
};
您会立即注意到性能的提高。当池中存在阻塞线程的可能性时,通常避免使用并行流
处理实际完成,但可能需要相当长的时间,具体取决于计算机上的硬件线程数。 API documentation about limit 警告说它对于并行流来说可能很慢。
实际上,并行流首先根据可用的并行度将计算分成几个部分,对每个部分执行一次计算,然后将结果连接在一起。你的任务有多少部分?每个普通 FJP 线程 (=Runtime.getRuntime().availableProcessors()
) 加上(有时?)一个用于当前线程(如果它不在 FJP 中)。你可以控制它添加
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");
实际上,对于您的任务,您设置的数字越小,计算速度就越快。
如何拆分无限任务?您的特定任务由 IteratorSpliterator 处理,该 trySplit 方法创建从 1024 开始不断增加的大小块。您可以自己尝试:
Spliterator<Integer> spliterator = Stream.iterate(200_000_000, n -> ++n).spliterator();
Spliterator[] spliterators = new Spliterator[10];
for(int i=0; i<spliterators.length; i++) {
spliterators[i] = spliterator.trySplit();
}
for(int i=0; i<spliterators.length; i++) {
System.out.print((i+1)+": ");
spliterators[i].tryAdvance(System.out::println);
}
因此第一个块处理范围为 200000000-200001023 的数字,第二个块处理范围为 200001024-200003071 的数字,依此类推。如果您只有 1 个硬件线程,您的任务将被分成两个块,因此将检查 3072。如果你有 8 个硬件线程,你的任务将被分成 9 个块,并检查 46080 个数字。只有在处理完所有块后,并行计算才会停止。将任务分成如此大的块的启发式方法在您的情况下效果不佳,但如果该区域周围的质数在几千个数字中出现一次,您会看到性能提升。
可能您的特定场景可以在内部进行优化(即,如果第一个线程发现限制条件已经达到,则停止计算)。随时向 Java 错误跟踪器报告错误。
Update 在进一步深入 Stream API 之后,我得出结论,当前行为是一个错误,raised an issue and posted a patch。该补丁很可能会被 JDK9 接受,甚至可能被移植到 JDK 8u 分支。加上我的补丁,并行版本仍然没有提高性能,但至少它的工作时间与顺序流工作时间相当。
我正在创建从 2 亿开始的无限整数流,使用朴素素性测试实现过滤此流以生成负载并将结果限制为 10。
Predicate<Integer> isPrime = new Predicate<Integer>() {
@Override
public boolean test(Integer n) {
for (int i = 2; i < n; i++) {
if (n % i == 0) return false;
}
return true;
}
};
Stream.iterate(200_000_000, n -> ++n)
.filter(isPrime)
.limit(10)
.forEach(i -> System.out.print(i + " "));
这按预期工作。
现在,如果我在过滤之前添加对 parallel() 的调用,则不会生成任何内容并且处理不会完成。
Stream.iterate(200_000_000, n -> ++n)
.parallel()
.filter(isPrime)
.limit(10)
.forEach(i -> System.out.print(i + " "));
有人可以为我指出这里发生的事情的正确方向吗?
编辑:我不是在寻找更好的素数测试实现(它是一个很长的 运行 实现),而是为了解释使用并行流的负面影响。
parallel
流花费这么长时间的原因是因为所有并行流都使用 common fork-join thread pool
并且因为您正在提交一个很长的 运行 任务(因为您的实现isPrime
方法效率不高),您阻塞了池中的所有线程,因此所有其他使用并行流的任务都被阻塞。
为了使并行版本更快,您可以更有效地实施isPrime。例如
Predicate<Integer> isPrime = new Predicate<Integer>() {
@Override
public boolean test(Integer n) {
if(n < 2) return false;
if(n == 2 || n == 3) return true;
if(n%2 == 0 || n%3 == 0) return false;
long sqrtN = (long)Math.sqrt(n)+1;
for(long i = 6L; i <= sqrtN; i += 6) {
if(n%(i-1) == 0 || n%(i+1) == 0) return false;
}
return true;
}
};
您会立即注意到性能的提高。当池中存在阻塞线程的可能性时,通常避免使用并行流
处理实际完成,但可能需要相当长的时间,具体取决于计算机上的硬件线程数。 API documentation about limit 警告说它对于并行流来说可能很慢。
实际上,并行流首先根据可用的并行度将计算分成几个部分,对每个部分执行一次计算,然后将结果连接在一起。你的任务有多少部分?每个普通 FJP 线程 (=Runtime.getRuntime().availableProcessors()
) 加上(有时?)一个用于当前线程(如果它不在 FJP 中)。你可以控制它添加
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");
实际上,对于您的任务,您设置的数字越小,计算速度就越快。
如何拆分无限任务?您的特定任务由 IteratorSpliterator 处理,该 trySplit 方法创建从 1024 开始不断增加的大小块。您可以自己尝试:
Spliterator<Integer> spliterator = Stream.iterate(200_000_000, n -> ++n).spliterator();
Spliterator[] spliterators = new Spliterator[10];
for(int i=0; i<spliterators.length; i++) {
spliterators[i] = spliterator.trySplit();
}
for(int i=0; i<spliterators.length; i++) {
System.out.print((i+1)+": ");
spliterators[i].tryAdvance(System.out::println);
}
因此第一个块处理范围为 200000000-200001023 的数字,第二个块处理范围为 200001024-200003071 的数字,依此类推。如果您只有 1 个硬件线程,您的任务将被分成两个块,因此将检查 3072。如果你有 8 个硬件线程,你的任务将被分成 9 个块,并检查 46080 个数字。只有在处理完所有块后,并行计算才会停止。将任务分成如此大的块的启发式方法在您的情况下效果不佳,但如果该区域周围的质数在几千个数字中出现一次,您会看到性能提升。
可能您的特定场景可以在内部进行优化(即,如果第一个线程发现限制条件已经达到,则停止计算)。随时向 Java 错误跟踪器报告错误。
Update 在进一步深入 Stream API 之后,我得出结论,当前行为是一个错误,raised an issue and posted a patch。该补丁很可能会被 JDK9 接受,甚至可能被移植到 JDK 8u 分支。加上我的补丁,并行版本仍然没有提高性能,但至少它的工作时间与顺序流工作时间相当。