Java ForkJoinPool 线程未完成
Java ForkJoinPool Threads Not Completing
我正在尝试使用 Java 流和 ForkJoinPool 并行化 for 循环,以控制使用的线程数。当 运行 使用单线程时,并行化代码 returns 与顺序程序的结果相同。顺序代码是一组标准的for循环:
for(String file : fileList){
for(String item : xList){
for(String x : aList) {
// action code
}
}
}
下面是我的并行实现:
ForkJoinPool threadPool = new ForkJoinPool(NUM_THREADS);
int chunkSize = aList.size()/NUM_THREADS;
for(String file : fileList){
for(String item : xList){
IntStream.range(0, NUM_THREADS)
.parallel().forEach(i -> threadPool.submit(() -> {
aList.subList(i*chunkSize, Math.min(i*chunkSize + chunkSize -1, aList.size()-1))
.forEach(x -> {
// action code
});
}));
threadPool.shutdown();
threadPool.awaitTermination(5, TimeUnit.MINUTES);
}
}
当使用超过 1 个线程时,仅完成有限次数的迭代。我试图使用 .shutdown()
和 .awaitTermination()
来确保完成所有线程,但这似乎不起作用。发生的迭代次数与 运行 运行(在 0-1500 之间)有很大差异。
注意:我使用的是具有 8 个可用内核(4 个双核)的 Macbook Pro,并且我的操作代码不包含使并行化不安全的引用。
如有任何建议,将不胜感激,谢谢!
我认为您遇到的实际问题是由您在 ForkJoinPool
上调用 shutdown
引起的。如果您查看 javadoc,这会导致 "an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted" - 即。我希望只有一项任务能够真正完成。
顺便说一句,按照您使用它的方式使用 ForkJoinPool
没有实际意义。 ForkJoinPool
旨在递归地拆分工作负载,与您在循环中创建子列表所做的不同——但是 ForkJoinPool
应该由 RecursiveAction
提供,它们自己拆分工作,而不是像在循环中那样事先将其拆分。不过,这只是旁注;你的代码应该 运行 没问题,但如果你只是将你的任务提交给一个正常的 ExecutorService
,它会更清楚,例如你通过 Executors.newFixedThreadPool(parallelism)
而不是 new ForkJoinPool()
获得的任务。
我正在尝试使用 Java 流和 ForkJoinPool 并行化 for 循环,以控制使用的线程数。当 运行 使用单线程时,并行化代码 returns 与顺序程序的结果相同。顺序代码是一组标准的for循环:
for(String file : fileList){
for(String item : xList){
for(String x : aList) {
// action code
}
}
}
下面是我的并行实现:
ForkJoinPool threadPool = new ForkJoinPool(NUM_THREADS);
int chunkSize = aList.size()/NUM_THREADS;
for(String file : fileList){
for(String item : xList){
IntStream.range(0, NUM_THREADS)
.parallel().forEach(i -> threadPool.submit(() -> {
aList.subList(i*chunkSize, Math.min(i*chunkSize + chunkSize -1, aList.size()-1))
.forEach(x -> {
// action code
});
}));
threadPool.shutdown();
threadPool.awaitTermination(5, TimeUnit.MINUTES);
}
}
当使用超过 1 个线程时,仅完成有限次数的迭代。我试图使用 .shutdown()
和 .awaitTermination()
来确保完成所有线程,但这似乎不起作用。发生的迭代次数与 运行 运行(在 0-1500 之间)有很大差异。
注意:我使用的是具有 8 个可用内核(4 个双核)的 Macbook Pro,并且我的操作代码不包含使并行化不安全的引用。
如有任何建议,将不胜感激,谢谢!
我认为您遇到的实际问题是由您在 ForkJoinPool
上调用 shutdown
引起的。如果您查看 javadoc,这会导致 "an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted" - 即。我希望只有一项任务能够真正完成。
顺便说一句,按照您使用它的方式使用 ForkJoinPool
没有实际意义。 ForkJoinPool
旨在递归地拆分工作负载,与您在循环中创建子列表所做的不同——但是 ForkJoinPool
应该由 RecursiveAction
提供,它们自己拆分工作,而不是像在循环中那样事先将其拆分。不过,这只是旁注;你的代码应该 运行 没问题,但如果你只是将你的任务提交给一个正常的 ExecutorService
,它会更清楚,例如你通过 Executors.newFixedThreadPool(parallelism)
而不是 new ForkJoinPool()
获得的任务。