如果并行处理,为什么在无穷无尽的数字流中按素数过滤需要永远?

Why is filtering by primality in an inifinite stream of numbers taking forever if processed in parallel?

我正在创建从 2 亿开始的无限整数流,使用朴素素性测试实现过滤此流以生成负载并将结果限制为 10。

Predicate<Integer> isPrime = new Predicate<Integer>() {
    @Override
    public boolean test(Integer n) {
        for (int i = 2; i < n; i++) {
            if (n % i == 0) return false;   
        }
        return true;
    }
};

Stream.iterate(200_000_000, n -> ++n)
    .filter(isPrime)
    .limit(10)
    .forEach(i -> System.out.print(i + " "));

这按预期工作。

现在,如果我在过滤之前添加对 parallel() 的调用,则不会生成任何内容并且处理不会完成。

Stream.iterate(200_000_000, n -> ++n)
    .parallel()
    .filter(isPrime)
    .limit(10)
    .forEach(i -> System.out.print(i + " "));

有人可以为我指出这里发生的事情的正确方向吗?

编辑:我不是在寻找更好的素数测试实现(它是一个很长的 运行 实现),而是为了解释使用并行流的负面影响。

parallel 流花费这么长时间的原因是因为所有并行流都使用 common fork-join thread pool 并且因为您正在提交一个很长的 运行 任务(因为您的实现isPrime 方法效率不高),您阻塞了池中的所有线程,因此所有其他使用并行流的任务都被阻塞。

为了使并行版本更快,您可以更有效地实施isPrime。例如

   Predicate<Integer> isPrime = new Predicate<Integer>() {
        @Override
        public boolean test(Integer n) {
            if(n < 2) return false;
            if(n == 2 || n == 3) return true;
            if(n%2 == 0 || n%3 == 0) return false;
            long sqrtN = (long)Math.sqrt(n)+1;
            for(long i = 6L; i <= sqrtN; i += 6) {
                if(n%(i-1) == 0 || n%(i+1) == 0) return false;
            }
            return true;
        }
    };

您会立即注意到性能的提高。当池中存在阻塞线程的可能性时,通常避免使用并行流

处理实际完成,但可能需要相当长的时间,具体取决于计算机上的硬件线程数。 API documentation about limit 警告说它对于并行流来说可能很慢。

实际上,并行流首先根据可用的并行度将计算分成几个部分,对每个部分执行一次计算,然后将结果连接在一起。你的任务有多少部分?每个普通 FJP 线程 (=Runtime.getRuntime().availableProcessors()) 加上(有时?)一个用于当前线程(如果它不在 FJP 中)。你可以控制它添加

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");

实际上,对于您的任务,您设置的数字越小,计算速度就越快。

如何拆分无限任务?您的特定任务由 IteratorSpliterator 处理,该 trySplit 方法创建从 1024 开始不断增加的大小块。您可以自己尝试:

Spliterator<Integer> spliterator = Stream.iterate(200_000_000, n -> ++n).spliterator();
Spliterator[] spliterators = new Spliterator[10];
for(int i=0; i<spliterators.length; i++) {
    spliterators[i] = spliterator.trySplit();
}
for(int i=0; i<spliterators.length; i++) {
    System.out.print((i+1)+": ");
    spliterators[i].tryAdvance(System.out::println);
}       

因此第一个块处理范围为 200000000-200001023 的数字,第二个块处理范围为 200001024-200003071 的数字,依此类推。如果您只有 1 个硬件线程,您的任务将被分成两个块,因此将检查 3072。如果你有 8 个硬件线程,你的任务将被分成 9 个块,并检查 46080 个数字。只有在处理完所有块后,并行计算才会停止。将任务分成如此大的块的启发式方法在您的情况下效果不佳,但如果该区域周围的质数在几千个数字中出现一次,您会看到性能提升。

可能您的特定场景可以在内部进行优化(即,如果第一个线程发现限制条件已经达到,则停止计算)。随时向 Java 错误跟踪器报告错误。


Update 在进一步深入 Stream API 之后,我得出结论,当前行为是一个错误,raised an issue and posted a patch。该补丁很可能会被 JDK9 接受,甚至可能被移植到 JDK 8u 分支。加上我的补丁,并行版本仍然没有提高性能,但至少它的工作时间与顺序流工作时间相当。