如何避免 congesting/stalling/deadlocking 具有递归可调用的 executorservice
How to avoid congesting/stalling/deadlocking an executorservice with recursive callable
ExecutorService 中的所有线程都忙于等待执行程序服务队列中卡住的任务。
示例代码:
ExecutorService es=Executors.newFixedThreadPool(8);
Set<Future<Set<String>>> outerSet=new HashSet<>();
for(int i=0;i<8;i++){
outerSet.add(es.submit(new Callable<Set<String>>() {
@Override
public Set<String> call() throws Exception {
Thread.sleep(10000); //to simulate work
Set<Future<String>> innerSet=new HashSet<>();
for(int j=0;j<8;j++) {
int k=j;
innerSet.add(es.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "number "+k+" in inner loop";
}
}));
}
Set<String> out=new HashSet<>();
while(!innerSet.isEmpty()) { //we are stuck at this loop because all the
for(Future<String> f:innerSet) { //callable in innerSet are stuckin the queue
if(f.isDone()) { //of es and can't start since all the threads
out.add(f.get()); //in es are busy waiting for them to finish
}
}
}
return out;
}
}));
}
除了为每一层创建更多线程池或使用大小不固定的线程池之外,还有什么方法可以避免这种情况吗?
一个实际的例子是,如果一些可调用对象被提交给 ForkJoinPool.commonPool(),然后这些任务使用的对象也通过它们的一种方法提交给 commonPool。
我建议尝试通过 CompletableFuture
分解工作以使用完成阶段
CompletableFuture.supplyAsync(outerTask)
.thenCompose(CompletableFuture.allOf(innerTasks)
这样你的外部任务就不会在处理内部任务时占用执行线程,但你仍然会得到一个在整个工作完成后解析的 Future。如果这些阶段耦合得太紧密,可能很难将它们分开。
你所建议的方法基本上是基于假设如果线程数多于任务数则有可能的解决方案,如果你已经分配了一个线程,这里将不起作用水池。你可以试试看。正如您在代码注释中所述,这是一个简单的死锁案例。
在这种情况下,使用两个独立的线程池,一个用于外部,另一个用于内部。当内部池中的任务完成时,只需 return 将值返回到外部。
或者您可以简单地动态创建一个线程,在其中完成工作,获取结果,然后 return 将其传回外部。
你应该使用 ForkJoinPool
。就是为这种情况而生的。
虽然您的解决方案在等待其子任务完成时永久阻塞线程,但窃取 ForkJoinPool
的工作可以在 join()
中执行工作。这使得它对于这些类型的情况非常有效,在这些情况下,您可能有可变数量的小型(并且通常是递归的)任务 运行。对于常规线程池,您需要加大它的大小,以确保您不会 运行 线程不足。
对于 CompletableFuture
,您需要自己处理更多实际的 planning/scheduling,如果您决定进行更改,调整起来会更加复杂。对于 FJP
,您唯一需要调整的是池中的线程数量,对于 CF
,您还需要考虑 then
与 thenAsync
。
ExecutorService 中的所有线程都忙于等待执行程序服务队列中卡住的任务。
示例代码:
ExecutorService es=Executors.newFixedThreadPool(8);
Set<Future<Set<String>>> outerSet=new HashSet<>();
for(int i=0;i<8;i++){
outerSet.add(es.submit(new Callable<Set<String>>() {
@Override
public Set<String> call() throws Exception {
Thread.sleep(10000); //to simulate work
Set<Future<String>> innerSet=new HashSet<>();
for(int j=0;j<8;j++) {
int k=j;
innerSet.add(es.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "number "+k+" in inner loop";
}
}));
}
Set<String> out=new HashSet<>();
while(!innerSet.isEmpty()) { //we are stuck at this loop because all the
for(Future<String> f:innerSet) { //callable in innerSet are stuckin the queue
if(f.isDone()) { //of es and can't start since all the threads
out.add(f.get()); //in es are busy waiting for them to finish
}
}
}
return out;
}
}));
}
除了为每一层创建更多线程池或使用大小不固定的线程池之外,还有什么方法可以避免这种情况吗?
一个实际的例子是,如果一些可调用对象被提交给 ForkJoinPool.commonPool(),然后这些任务使用的对象也通过它们的一种方法提交给 commonPool。
我建议尝试通过 CompletableFuture
CompletableFuture.supplyAsync(outerTask)
.thenCompose(CompletableFuture.allOf(innerTasks)
这样你的外部任务就不会在处理内部任务时占用执行线程,但你仍然会得到一个在整个工作完成后解析的 Future。如果这些阶段耦合得太紧密,可能很难将它们分开。
你所建议的方法基本上是基于假设如果线程数多于任务数则有可能的解决方案,如果你已经分配了一个线程,这里将不起作用水池。你可以试试看。正如您在代码注释中所述,这是一个简单的死锁案例。
在这种情况下,使用两个独立的线程池,一个用于外部,另一个用于内部。当内部池中的任务完成时,只需 return 将值返回到外部。
或者您可以简单地动态创建一个线程,在其中完成工作,获取结果,然后 return 将其传回外部。
你应该使用 ForkJoinPool
。就是为这种情况而生的。
虽然您的解决方案在等待其子任务完成时永久阻塞线程,但窃取 ForkJoinPool
的工作可以在 join()
中执行工作。这使得它对于这些类型的情况非常有效,在这些情况下,您可能有可变数量的小型(并且通常是递归的)任务 运行。对于常规线程池,您需要加大它的大小,以确保您不会 运行 线程不足。
对于 CompletableFuture
,您需要自己处理更多实际的 planning/scheduling,如果您决定进行更改,调整起来会更加复杂。对于 FJP
,您唯一需要调整的是池中的线程数量,对于 CF
,您还需要考虑 then
与 thenAsync
。