如何 运行 并发作业与依赖任务?
How to run concurrent job with dependent tasks?
我有一个情况需要处理
我有一个 class 有发送方法,例子
@Singleton
class SendReport {
public void send() {}
}
发送方法从用户点击网页时调用,并且必须立即return,但必须启动一系列需要时间的任务
send
->|
| |-> Task1
<-| |
<-|
|
|-> Task2 (can only start when Task1 completes/throws exception)
<-|
|
|-> Task3 (can only start when Task2 completes/throws exception)
<-|
我是 Java 并发世界的新手,正在阅读它。根据我的理解,我需要一个 Executor Service
和 submit()
一个作业 (Task1
) 来处理并让 Future
返回以继续。
我说的对吗?
我理解和设计的困难部分是
- 如何以及在何处处理任何此类任务的异常?
- 据我所知,我必须做类似的事情吗?
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future futureTask1 = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("doing Task1");
return "Task1 Result";
}
});
if (futureTask1.get() != null) {
Future futureTask2 = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("doing Task2");
return "Task2 Result";
}
}
... and so on for Task 3
是否正确?
如果有,有没有更好的推荐方式?
谢谢
您当前的方法将不起作用,因为它会一直阻塞到您想要避免的完全完成。
future.get() 是阻塞();
所以在提交第一个任务后,你的代码会等到它完成,然后提交下一个任务,再次等待,所以与单线程一个一个地执行任务相比没有优势。
所以如果有的话,代码需要是:
Future futureTask2 = executorService.submit(new Callable(){
public Object call() throws Exception {
futureTask1.get()
System.out.println("doing Task2");
return "Task2 Result";
}
}
你的图表表明后续任务应该在异常情况下执行。如果计算出现问题,Get 将抛出 ExecutionException,因此您需要通过适当的尝试来保护 get()。
既然Task1,Task2要一个接一个完成,为什么要分线程执行。为什么不让一个线程使用 运行 方法来处理 Task1、Task2.. 一个接一个。正如您所说的,不是您的 "main" 线程,它可以在执行程序作业中,但可以处理所有任务。
我个人不喜欢匿名内部 类 和回调(这就是您对期货链的模仿)。如果我必须实现任务序列,我实际上会实现任务队列和执行它们的处理器。
主要是因为它是"more manageable",因为我可以监控队列的内容,甚至删除不需要的任务。
所以我会有一个 BlockingQueue<JobDescription>
,我会向其中提交包含任务执行所需的所有数据的 JobDescription。
我将实现线程(处理器),在它们的 运行() 中将具有无限循环,在该循环中它们从队列中获取作业,执行任务,然后将后续任务放回队列中。这些行中的内容。
但是如果任务是在发送方法中预定义的,我会简单地将它们作为一个作业提交,然后在一个线程中执行。如果它们总是顺序的,那么在不同线程之间拆分它们就没有意义了。
如果你只是有一行任务需要在完成前一个任务时调用,而不是在前面的答案中所述和讨论的,我认为你根本不需要多线程。
如果您有一个任务池,其中一些任务需要知道另一个任务的结果而其他任务不关心,那么您可以想出一个依赖的可调用实现。
public class DependentCallable implements Callable {
private final String name;
private final Future pre;
public DependentCallable(String name, Future pre) {
this.name = name;
this.pre = pre;
}
@Override
public Object call() throws Exception {
if (pre != null) {
pre.get();
//pre.get(10, TimeUnit.SECONDS);
}
System.out.println(name);
return name;
}
根据您问题中的代码,您还需要注意一些其他事项,如先前答复中所述,在提交之间删除 future.gets。使用大小至少大于可调用对象之间的依赖深度的线程池。
如果您想return立即发送请求,您需要再添加一项任务。请检查以下示例。它将请求提交给后台线程,后台线程将按顺序执行任务,然后 returns.
3 个长 运行 任务的可调用对象。
public class Task1 implements Callable<String> {
public String call() throws Exception {
Thread.sleep(5000);
System.out.println("Executing Task1...");
return Thread.currentThread().getName();
}
}
public class Task2 implements Callable<String> {
public String call() throws Exception {
Thread.sleep(5000);
System.out.println("Executing Task2...");
return Thread.currentThread().getName();
}
}
public class Task3 implements Callable<String> {
public String call() throws Exception {
Thread.sleep(5000);
System.out.println("Executing Task3...");
return Thread.currentThread().getName();
}
}
Main 方法,立即从客户端和 returns 获取请求,然后开始顺序执行任务。
public class ThreadTest {
public static void main(String[] args) {
final ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.submit(new Runnable() {
public void run() {
try {
Future<String> result1 = executorService.submit(new Task1());
if (result1.get() != null) {
Future<String> result2 = executorService.submit(new Task2());
if (result2.get() != null) {
executorService.submit(new Task3());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
});
System.out.println("Submitted request...");
}
}
Dependent task execution 使用 Dexecutor
变得简单
免责声明:我是所有者
这里有一个例子,它可以很容易地运行下面的复杂图,你可以参考this了解更多详情
这里是an example
我有一个情况需要处理
我有一个 class 有发送方法,例子
@Singleton
class SendReport {
public void send() {}
}
发送方法从用户点击网页时调用,并且必须立即return,但必须启动一系列需要时间的任务
send
->|
| |-> Task1
<-| |
<-|
|
|-> Task2 (can only start when Task1 completes/throws exception)
<-|
|
|-> Task3 (can only start when Task2 completes/throws exception)
<-|
我是 Java 并发世界的新手,正在阅读它。根据我的理解,我需要一个 Executor Service
和 submit()
一个作业 (Task1
) 来处理并让 Future
返回以继续。
我说的对吗?
我理解和设计的困难部分是
- 如何以及在何处处理任何此类任务的异常?
- 据我所知,我必须做类似的事情吗?
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future futureTask1 = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("doing Task1");
return "Task1 Result";
}
});
if (futureTask1.get() != null) {
Future futureTask2 = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("doing Task2");
return "Task2 Result";
}
}
... and so on for Task 3
是否正确? 如果有,有没有更好的推荐方式?
谢谢
您当前的方法将不起作用,因为它会一直阻塞到您想要避免的完全完成。
future.get() 是阻塞(); 所以在提交第一个任务后,你的代码会等到它完成,然后提交下一个任务,再次等待,所以与单线程一个一个地执行任务相比没有优势。
所以如果有的话,代码需要是:
Future futureTask2 = executorService.submit(new Callable(){
public Object call() throws Exception {
futureTask1.get()
System.out.println("doing Task2");
return "Task2 Result";
}
}
你的图表表明后续任务应该在异常情况下执行。如果计算出现问题,Get 将抛出 ExecutionException,因此您需要通过适当的尝试来保护 get()。
既然Task1,Task2要一个接一个完成,为什么要分线程执行。为什么不让一个线程使用 运行 方法来处理 Task1、Task2.. 一个接一个。正如您所说的,不是您的 "main" 线程,它可以在执行程序作业中,但可以处理所有任务。
我个人不喜欢匿名内部 类 和回调(这就是您对期货链的模仿)。如果我必须实现任务序列,我实际上会实现任务队列和执行它们的处理器。
主要是因为它是"more manageable",因为我可以监控队列的内容,甚至删除不需要的任务。
所以我会有一个 BlockingQueue<JobDescription>
,我会向其中提交包含任务执行所需的所有数据的 JobDescription。
我将实现线程(处理器),在它们的 运行() 中将具有无限循环,在该循环中它们从队列中获取作业,执行任务,然后将后续任务放回队列中。这些行中的内容。
但是如果任务是在发送方法中预定义的,我会简单地将它们作为一个作业提交,然后在一个线程中执行。如果它们总是顺序的,那么在不同线程之间拆分它们就没有意义了。
如果你只是有一行任务需要在完成前一个任务时调用,而不是在前面的答案中所述和讨论的,我认为你根本不需要多线程。
如果您有一个任务池,其中一些任务需要知道另一个任务的结果而其他任务不关心,那么您可以想出一个依赖的可调用实现。
public class DependentCallable implements Callable {
private final String name;
private final Future pre;
public DependentCallable(String name, Future pre) {
this.name = name;
this.pre = pre;
}
@Override
public Object call() throws Exception {
if (pre != null) {
pre.get();
//pre.get(10, TimeUnit.SECONDS);
}
System.out.println(name);
return name;
}
根据您问题中的代码,您还需要注意一些其他事项,如先前答复中所述,在提交之间删除 future.gets。使用大小至少大于可调用对象之间的依赖深度的线程池。
如果您想return立即发送请求,您需要再添加一项任务。请检查以下示例。它将请求提交给后台线程,后台线程将按顺序执行任务,然后 returns.
3 个长 运行 任务的可调用对象。
public class Task1 implements Callable<String> {
public String call() throws Exception {
Thread.sleep(5000);
System.out.println("Executing Task1...");
return Thread.currentThread().getName();
}
}
public class Task2 implements Callable<String> {
public String call() throws Exception {
Thread.sleep(5000);
System.out.println("Executing Task2...");
return Thread.currentThread().getName();
}
}
public class Task3 implements Callable<String> {
public String call() throws Exception {
Thread.sleep(5000);
System.out.println("Executing Task3...");
return Thread.currentThread().getName();
}
}
Main 方法,立即从客户端和 returns 获取请求,然后开始顺序执行任务。
public class ThreadTest {
public static void main(String[] args) {
final ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.submit(new Runnable() {
public void run() {
try {
Future<String> result1 = executorService.submit(new Task1());
if (result1.get() != null) {
Future<String> result2 = executorService.submit(new Task2());
if (result2.get() != null) {
executorService.submit(new Task3());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
});
System.out.println("Submitted request...");
}
}
Dependent task execution 使用 Dexecutor
变得简单免责声明:我是所有者
这里有一个例子,它可以很容易地运行下面的复杂图,你可以参考this了解更多详情
这里是an example