与 Streams API 的奇怪死锁

Strange dead-lock with Streams API

我正在编写一个程序,并决定为 Java 8 使用新的 Streams API。但是,当我引入 .parallel() 时,我的程序停止工作了。这是相关代码:

import java.math.BigInteger;
import java.util.Objects;
import java.util.stream.Stream;

import com.google.common.cache.*;

public class Alg196 {

    public static void main(String[] args) {
        // Add .parallel() where suitable
        long c = Stream
                .iterate(BigInteger.valueOf(101), i -> i.add(BigInteger.ONE))
                .limit(100000000).map(BigInteger::toString).map(Alg196::alg196)
                .filter(Objects::nonNull).count();
        System.err.println(c);
    }

    private static final String reverse(String n) {
        return new StringBuilder(n).reverse().toString();
    }

    private static final boolean isPalindrome(String s) {
        for (int i = 0, j = s.length() - 1; i < j; ++i, --j) {
            if (s.charAt(i) != s.charAt(j))
                return false;
        }
        return true;
    }

    private static final String alg196(String n) {
        System.err.println("PROCESSING " + n);
        int loops = 0;
        while (!isPalindrome(n)) {
            n = new BigInteger(n).add(new BigInteger(reverse(n))).toString();
            loops++;
            if (loops >= 100) {
                return null;
            }
        }
        if (loops <= 10) {
            return null;
        }
        return n;
    }
}

如果正常工作,输出将包含许多 PROCESSING <x> 行,但 .parallel() 永远不会发生这种情况。这是为什么?

原来 limit 是一个短路操作,这意味着它将在 运行 之前生成所有 100,000,000 BigIntegers。毕竟不是死锁!

您的程序没有停止 - 它正在努力生成请求的 BigIntegers 范围(在您的情况下是 100000000),然后再将地图任务提交给执行程序(尝试将断点放在 BigInteger::add() 方法上- 你会看到)

从线程转储中也很容易看出

"ForkJoinPool.commonPool-worker-2@710" daemon prio=5 tid=0xe nid=NA runnable java.lang.Thread.State: RUNNABLE
at java.util.stream.Stream.next(Stream.java:1033)
  at java.util.Spliterators$IteratorSpliterator.trySplit(Spliterators.java:1784)
  at java.util.stream.AbstractShortCircuitTask.compute(AbstractShortCircuitTask.java:114)
  at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:902)
  at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1689)
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1644)
  at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)`

另外请注意将许多长运行 任务提交到公共 ForkJoin 池,因为您可能会阻塞池中的所有线程 - 您可以查看此线程 (Custom thread pool in Java 8 parallel stream) 以寻求解决方案