与 Streams API 的奇怪死锁
Strange dead-lock with Streams API
我正在编写一个程序,并决定为 Java 8 使用新的 Streams API。但是,当我引入 .parallel()
时,我的程序停止工作了。这是相关代码:
import java.math.BigInteger;
import java.util.Objects;
import java.util.stream.Stream;
import com.google.common.cache.*;
public class Alg196 {
public static void main(String[] args) {
// Add .parallel() where suitable
long c = Stream
.iterate(BigInteger.valueOf(101), i -> i.add(BigInteger.ONE))
.limit(100000000).map(BigInteger::toString).map(Alg196::alg196)
.filter(Objects::nonNull).count();
System.err.println(c);
}
private static final String reverse(String n) {
return new StringBuilder(n).reverse().toString();
}
private static final boolean isPalindrome(String s) {
for (int i = 0, j = s.length() - 1; i < j; ++i, --j) {
if (s.charAt(i) != s.charAt(j))
return false;
}
return true;
}
private static final String alg196(String n) {
System.err.println("PROCESSING " + n);
int loops = 0;
while (!isPalindrome(n)) {
n = new BigInteger(n).add(new BigInteger(reverse(n))).toString();
loops++;
if (loops >= 100) {
return null;
}
}
if (loops <= 10) {
return null;
}
return n;
}
}
如果正常工作,输出将包含许多 PROCESSING <x>
行,但 .parallel()
永远不会发生这种情况。这是为什么?
原来 limit
是一个短路操作,这意味着它将在 运行 之前生成所有 100,000,000
BigIntegers
。毕竟不是死锁!
您的程序没有停止 - 它正在努力生成请求的 BigIntegers 范围(在您的情况下是 100000000),然后再将地图任务提交给执行程序(尝试将断点放在 BigInteger::add() 方法上- 你会看到)
从线程转储中也很容易看出
"ForkJoinPool.commonPool-worker-2@710" daemon prio=5 tid=0xe nid=NA runnable java.lang.Thread.State: RUNNABLE
at java.util.stream.Stream.next(Stream.java:1033)
at java.util.Spliterators$IteratorSpliterator.trySplit(Spliterators.java:1784)
at java.util.stream.AbstractShortCircuitTask.compute(AbstractShortCircuitTask.java:114)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:902)
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1689)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1644)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)`
另外请注意将许多长运行 任务提交到公共 ForkJoin 池,因为您可能会阻塞池中的所有线程 - 您可以查看此线程 (Custom thread pool in Java 8 parallel stream) 以寻求解决方案
我正在编写一个程序,并决定为 Java 8 使用新的 Streams API。但是,当我引入 .parallel()
时,我的程序停止工作了。这是相关代码:
import java.math.BigInteger;
import java.util.Objects;
import java.util.stream.Stream;
import com.google.common.cache.*;
public class Alg196 {
public static void main(String[] args) {
// Add .parallel() where suitable
long c = Stream
.iterate(BigInteger.valueOf(101), i -> i.add(BigInteger.ONE))
.limit(100000000).map(BigInteger::toString).map(Alg196::alg196)
.filter(Objects::nonNull).count();
System.err.println(c);
}
private static final String reverse(String n) {
return new StringBuilder(n).reverse().toString();
}
private static final boolean isPalindrome(String s) {
for (int i = 0, j = s.length() - 1; i < j; ++i, --j) {
if (s.charAt(i) != s.charAt(j))
return false;
}
return true;
}
private static final String alg196(String n) {
System.err.println("PROCESSING " + n);
int loops = 0;
while (!isPalindrome(n)) {
n = new BigInteger(n).add(new BigInteger(reverse(n))).toString();
loops++;
if (loops >= 100) {
return null;
}
}
if (loops <= 10) {
return null;
}
return n;
}
}
如果正常工作,输出将包含许多 PROCESSING <x>
行,但 .parallel()
永远不会发生这种情况。这是为什么?
原来 limit
是一个短路操作,这意味着它将在 运行 之前生成所有 100,000,000
BigIntegers
。毕竟不是死锁!
您的程序没有停止 - 它正在努力生成请求的 BigIntegers 范围(在您的情况下是 100000000),然后再将地图任务提交给执行程序(尝试将断点放在 BigInteger::add() 方法上- 你会看到)
从线程转储中也很容易看出
"ForkJoinPool.commonPool-worker-2@710" daemon prio=5 tid=0xe nid=NA runnable java.lang.Thread.State: RUNNABLE
at java.util.stream.Stream.next(Stream.java:1033)
at java.util.Spliterators$IteratorSpliterator.trySplit(Spliterators.java:1784)
at java.util.stream.AbstractShortCircuitTask.compute(AbstractShortCircuitTask.java:114)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:902)
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1689)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1644)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)`
另外请注意将许多长运行 任务提交到公共 ForkJoin 池,因为您可能会阻塞池中的所有线程 - 您可以查看此线程 (Custom thread pool in Java 8 parallel stream) 以寻求解决方案