为什么 ForkJoinPool::invoke() 会阻塞主线程?
Why does ForkJoinPool::invoke() block the main thread?
免责声明:这是我第一次使用 Java 的 Fork-Join 框架,所以我不能 100% 确定我是否正确使用了它。 Java 也不是我的主要编程语言,所以这也可能是相关的。
给出以下 SSCCE:
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
class ForkCalculator extends RecursiveAction
{
private final Integer[] delayTasks;
public ForkCalculator(Integer[] delayTasks)
{
this.delayTasks = delayTasks;
}
@Override
protected void compute()
{
if (this.delayTasks.length == 1) {
this.computeDirectly();
return;
}
Integer halfway = this.delayTasks.length / 2;
ForkJoinTask.invokeAll(
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, 0, halfway)
),
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
)
);
}
private void computeDirectly()
{
Integer delayTask = this.delayTasks[0];
try {
Thread.sleep(delayTask);
} catch (InterruptedException ex) {
System.err.println(ex.getMessage());
System.exit(2);
}
System.out.println("Finished computing task with delay " + delayTask);
}
}
public final class ForkJoinBlocker
{
public static void main(String[] args)
{
ForkCalculator calculator = new ForkCalculator(
new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
);
ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors()
);
pool.invoke(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(
new TimerTask() {
@Override
public void run()
{
System.out.println(pool.toString());
}
},
100,
2000
);
}
}
所以我创建了一个 ForkJoinPool
,我向其提交了一些任务,这些任务进行了一些处理。出于本示例的目的,为了简单起见,我将它们替换为 Thread.sleep()
。
在我的实际程序中,这是一个很长的任务列表,所以我想定期在标准输出上打印当前状态。我尝试使用预定的 TimerTask
.
在单独的线程上执行此操作
但是,我注意到了一些出乎我意料的事情:在我的示例中,输出类似于:
Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......
这意味着 "status-task" 永远不会执行。
但是,如果我修改我的代码以在最后移动 pool.invoke(calculator);
,那么它会按预期工作:
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......
我能得出的唯一结论是 ForkJoinPool::invoke()
阻塞了主线程(只有 returns 池中的所有任务都完成后)。
我希望主线程中的代码继续执行,而 fork-join-pool 中的任务被处理异步。
我的问题是:出现这种情况是不是因为我错误地使用了框架?我的代码中有什么地方必须更正吗?
我注意到 ForkJoinPool
的构造函数之一有一个 boolean asyncMode
参数,但是,从实现中我可以看出,这只是在 FIFO_QUEUE
和 [=23] 之间做出决定=] 执行模式(不确定是什么):
public ForkJoinPool(
int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode
) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
基本上 invoke()
会等待整个任务完成后再返回,所以是的,主线程正在阻塞。在那之后,Timer
没有时间执行,因为它在守护线程上运行。
您可以简单地使用 execute()
而不是异步运行任务的 invoke()
。然后你可以在ForkJoinTask
上join()
等待结果,期间Timer
会是运行:
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.execute(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
System.out.println(pool.toString());
}
}, 100, 2000);
calculator.join(); // wait for computation
免责声明:这是我第一次使用 Java 的 Fork-Join 框架,所以我不能 100% 确定我是否正确使用了它。 Java 也不是我的主要编程语言,所以这也可能是相关的。
给出以下 SSCCE:
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
class ForkCalculator extends RecursiveAction
{
private final Integer[] delayTasks;
public ForkCalculator(Integer[] delayTasks)
{
this.delayTasks = delayTasks;
}
@Override
protected void compute()
{
if (this.delayTasks.length == 1) {
this.computeDirectly();
return;
}
Integer halfway = this.delayTasks.length / 2;
ForkJoinTask.invokeAll(
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, 0, halfway)
),
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
)
);
}
private void computeDirectly()
{
Integer delayTask = this.delayTasks[0];
try {
Thread.sleep(delayTask);
} catch (InterruptedException ex) {
System.err.println(ex.getMessage());
System.exit(2);
}
System.out.println("Finished computing task with delay " + delayTask);
}
}
public final class ForkJoinBlocker
{
public static void main(String[] args)
{
ForkCalculator calculator = new ForkCalculator(
new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
);
ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors()
);
pool.invoke(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(
new TimerTask() {
@Override
public void run()
{
System.out.println(pool.toString());
}
},
100,
2000
);
}
}
所以我创建了一个 ForkJoinPool
,我向其提交了一些任务,这些任务进行了一些处理。出于本示例的目的,为了简单起见,我将它们替换为 Thread.sleep()
。
在我的实际程序中,这是一个很长的任务列表,所以我想定期在标准输出上打印当前状态。我尝试使用预定的 TimerTask
.
但是,我注意到了一些出乎我意料的事情:在我的示例中,输出类似于:
Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......
这意味着 "status-task" 永远不会执行。
但是,如果我修改我的代码以在最后移动 pool.invoke(calculator);
,那么它会按预期工作:
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......
我能得出的唯一结论是 ForkJoinPool::invoke()
阻塞了主线程(只有 returns 池中的所有任务都完成后)。
我希望主线程中的代码继续执行,而 fork-join-pool 中的任务被处理异步。
我的问题是:出现这种情况是不是因为我错误地使用了框架?我的代码中有什么地方必须更正吗?
我注意到 ForkJoinPool
的构造函数之一有一个 boolean asyncMode
参数,但是,从实现中我可以看出,这只是在 FIFO_QUEUE
和 [=23] 之间做出决定=] 执行模式(不确定是什么):
public ForkJoinPool(
int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode
) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
基本上 invoke()
会等待整个任务完成后再返回,所以是的,主线程正在阻塞。在那之后,Timer
没有时间执行,因为它在守护线程上运行。
您可以简单地使用 execute()
而不是异步运行任务的 invoke()
。然后你可以在ForkJoinTask
上join()
等待结果,期间Timer
会是运行:
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.execute(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
System.out.println(pool.toString());
}
}, 100, 2000);
calculator.join(); // wait for computation