如何在异步线程 return 回调之前保持线程执行

How to hold the thread execution until asynchronous thread return callback

我的场景如下图所示

这里主线程是my java application.it开一个WM线程执行。 WM处理任务execution.he需要调用任务号执行。 假设它包含任务 T1,T2,T3

T3 依赖于 T2,T2 依赖于 T1。 WM首先调用RM执行T1的任务执行。 T1可以在寻呼中响应,也可以在T1完成后响应。

问题是如何等待 T1 完成然后启动 T2 execution.and 当 T1 部分完成在分页中发送数据时如何通知 WM。

这是简单的场景,但在 T1、T2、T3、T4 的情况下。 T3 依赖于 T1 和 T2。

代码:

public class TestAsync implements TaskCallBack {
    public static ExecutorService exService = Executors.newFixedThreadPool(5);
    public static void main(String args[]) throws InterruptedException, ExecutionException{
        Task t1 = new Task();
        t1.doTask(new TestAsync());

    }

    public static ExecutorService getPool(){
        return exService;
    }

    @Override
    public void taskCompleted(String obj) {
        System.out.println(obj);
    }
}

class Task {
 public void doTask(TaskCallBack tcb) throws InterruptedException, ExecutionException{
     FutureTask<String> ft = new FutureTask<>(new Task1());
     TestAsync.getPool().execute(ft);
     tcb.taskCompleted(ft.get());
 }

}

class Task1 implements Callable<String>{

    @Override
    public String call() throws Exception {
        System.out.println(Thread.currentThread().getName());               
        return "done";
    }

  interface TaskCallBack{
      public void TaskCompleted(String obj);
  }

}

如果您知道 T1 和 T2 的任务数,则可以使用 CountDownLatch。您最好的选择可能是在将 T2 任务添加到执行程序之前等待,直到所有 T1 任务都已完成(T2 -> T3 相同)。

如果您不能更改添加代码,您也可以让每个 T2 任务等待所有 T1 任务完成,但如果添加任务,这将导致活跃度问题(执行程序线程池除了休眠线程外什么都没有)无序。

这个话题很有趣。我在开发高度并行的网络数据包处理解决方案时遇到了类似的问题。我将分享我的发现,但在此之前我应该​​说,对任何并行系统使用某种临时解决方案总是一个坏主意。

如果没有适当的架构支持,调试、优化和进一步开发可能会成为一场噩梦。假设我们有三个相关任务:

第一个解决方案

将引入composite or compound task抽象,以让依赖的任务按正确的顺序执行并摆脱延迟,waiting/blocking,复杂的任务管理等

我将使用简化的代码来说明这种方法:

/**
 * Defines a high-level task contract. 
 * Let's pretend it is enough to have it this simple.
 */
interface Task extends Runnable {

}

/**
 * Defines a simple way to group dependent tasks.
 * 
 * Please note, this is the simplest way to make sure dependent tasks will be
 * executed in a specified order without any additional overhead.
 */
class CompoundTasks implements Task {

    private List<Task> tasks = ...;

    public void add(Task task) {
        tasks.add(task);
    }

    @Override
    public void run() {
        for(Task t : tasks) {
           t.run();
        }
    }        
}

第二种解法

将让任务具有明确的依赖关系并让执行者意识到这一点。基本上,规则很简单——如果任务有未解决的依赖关系,它应该被推迟。这种方法可以很容易地实现并且工作得很好。

请注意,由于需要一些资源来验证任务、管理队列等,第二种解决方案将引入微小的性能损失。

让我们改进基于任务的方法:

/**
 * Defines yet another abstraction to make dependencies 
 * visible and properly tracked. 
 * 
 */
abstract class DependentTask implements Task {

    private List<DependentTask> dependencies = ...;

    public void addDependency(DependentTask task) {
        dependencies.add(task);
    }

    /**
     * Verifies task can be processed. 
     */
    public boolean hasDependenciesResolved() {
        boolean result = true;
        for(DependentTask t : dependencies) {
            if(!t.hasDependenciesResolved()) {
                result = false;
                break;
            }
        }
        return result;
    }

    @Override
    public abstract void run();
}

/**
 * Implements a very basic queue aware of task dependencies.
 * 
 * Basically, the idea is just to avoid any blocking state. If task can't
 * be processed (because of unresolved dependencies) it should be 
 * postponed and verified later.
 */
class TaskQueue<T extends DependentTask> implements Runnable {        
    private Queue<T> queue = ...;

    @Override
    public void run() {
        while(true) {
            if(!queue.isEmpty()) {

                T task = queue.poll();

                // Verify all dependencies have been resolved.
                if(task.hasDependenciesResolved()) {
                    task.run();         // process task if there is no unresolved
                                        // dependencies
                }else{
                    queue.add(task);    // return task to the queue
                }

            }else{
                // sleep for some reasonable amount of time
            }
        }
    }        
}

这两种方法都很容易追踪,因此您始终能够了解正在发生的事情。