如何 运行 并发作业与依赖任务?

How to run concurrent job with dependent tasks?

我有一个情况需要处理

我有一个 class 有发送方法,例子

@Singleton
class SendReport {

 public void send() {}
}

发送方法从用户点击网页时调用,并且必须立即return,但必须启动一系列需要时间的任务

send
 ->|
   | |-> Task1
 <-|      |
        <-|
          | 
        |-> Task2 (can only start when Task1 completes/throws exception)
        <-|
          | 
        |-> Task3 (can only start when Task2 completes/throws exception)
        <-|

我是 Java 并发世界的新手,正在阅读它。根据我的理解,我需要一个 Executor Servicesubmit() 一个作业 (Task1) 来处理并让 Future 返回以继续。

我说的对吗?

我理解和设计的困难部分是
- 如何以及在何处处理任何此类任务的异常?
- 据我所知,我必须做类似的事情吗?

    ExecutorService executorService = Executors.newFixedThreadPool(1);
    Future futureTask1 = executorService.submit(new Callable(){
    public Object call() throws Exception {
        System.out.println("doing Task1");
        return "Task1 Result";
    }
    });
    if (futureTask1.get() != null) {
    Future futureTask2 = executorService.submit(new Callable(){
    public Object call() throws Exception {
        System.out.println("doing Task2");
        return "Task2 Result";
    }
    }
    ... and so on for Task 3

是否正确? 如果有,有没有更好的推荐方式?

谢谢

您当前的方法将不起作用,因为它会一直阻塞到您想要避免的完全完成。

future.get() 是阻塞(); 所以在提交第一个任务后,你的代码会等到它完成,然后提交下一个任务,再次等待,所以与单线程一个一个地执行任务相比没有优势。

所以如果有的话,代码需要是:

Future futureTask2 = executorService.submit(new Callable(){
    public Object call() throws Exception {
        futureTask1.get()
        System.out.println("doing Task2");
        return "Task2 Result";
    }
}

你的图表表明后续任务应该在异常情况下执行。如果计算出现问题,Get 将抛出 ExecutionException,因此您需要通过适当的尝试来保护 get()。

既然Task1,Task2要一个接一个完成,为什么要分线程执行。为什么不让一个线程使用 运行 方法来处理 Task1、Task2.. 一个接一个。正如您所说的,不是您的 "main" 线程,它可以在执行程序作业中,但可以处理所有任务。

我个人不喜欢匿名内部 类 和回调(这就是您对期货链的模仿)。如果我必须实现任务序列,我实际上会实现任务队列和执行它们的处理器。

主要是因为它是"more manageable",因为我可以监控队列的内容,甚至删除不需要的任务。

所以我会有一个 BlockingQueue<JobDescription>,我会向其中提交包含任务执行所需的所有数据的 JobDescription。

我将实现线程(处理器),在它们的 运行() 中将具有无限循环,在该循环中它们从队列中获取作业,执行任务,然后将后续任务放回队列中。这些行中的内容。

但是如果任务是在发送方法中预定义的,我会简单地将它们作为一个作业提交,然后在一个线程中执行。如果它们总是顺序的,那么在不同线程之间拆分它们就没有意义了。

如果你只是有一行任务需要在完成前一个任务时调用,而不是在前面的答案中所述和讨论的,我认为你根本不需要多线程。

如果您有一个任务池,其中一些任务需要知道另一个任务的结果而其他任务不关心,那么您可以想出一个依赖的可调用实现。

public class DependentCallable implements Callable {

private final String name;
private final Future pre;

public DependentCallable(String name, Future pre) {
    this.name = name;
    this.pre = pre;
}

@Override
public Object call() throws Exception {
    if (pre != null) {
        pre.get();
        //pre.get(10, TimeUnit.SECONDS);
    }
    System.out.println(name);
    return name;
}

根据您问题中的代码,您还需要注意一些其他事项,如先前答复中所述,在提交之间删除 future.gets。使用大小至少大于可调用对象之间的依赖深度的线程池。

如果您想return立即发送请求,您需要再添加一项任务。请检查以下示例。它将请求提交给后台线程,后台线程将按顺序执行任务,然后 returns.

3 个长 运行 任务的可调用对象。

public class Task1 implements Callable<String> {

    public String call() throws Exception {
        Thread.sleep(5000);
        System.out.println("Executing Task1...");
        return Thread.currentThread().getName();
    }
}

public class Task2 implements Callable<String> {

    public String call() throws Exception {
        Thread.sleep(5000);
        System.out.println("Executing Task2...");
        return Thread.currentThread().getName();
    }
}

public class Task3 implements Callable<String> {

    public String call() throws Exception {
        Thread.sleep(5000);
        System.out.println("Executing Task3...");
        return Thread.currentThread().getName();
    }
}

Main 方法,立即从客户端和 returns 获取请求,然后开始顺序执行任务。

public class ThreadTest {

    public static void main(String[] args) {
        final ExecutorService executorService = Executors.newFixedThreadPool(5);

        executorService.submit(new Runnable() {
            public void run() {
                try {
                    Future<String> result1 = executorService.submit(new Task1());
                    if (result1.get() != null) {
                        Future<String> result2 = executorService.submit(new Task2());
                        if (result2.get() != null) {
                            executorService.submit(new Task3());
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        });

        System.out.println("Submitted request...");
    }

}

Dependent task execution 使用 Dexecutor

变得简单

免责声明:我是所有者

这里有一个例子,它可以很容易地运行下面的复杂图,你可以参考this了解更多详情

这里是an example