使用 ExecutorService 重复并行执行多个相似的任务
Using ExecutorService to repeatedly perform a number of similar tasks in parallel
有 Java 代码,简化后如下所示:
while(someCondition)
{
SomeType a = CalcResult(param1);
SomeType b = CalcResult(param2);
SomeType c = CalcResult(param3);
// Do something with a, b and c
}
CalcResult()
很费时间。该应用程序在 SMP 系统上运行。有一种推动力同时尝试 运行 所有三个计算,每个单独 CPU 而不是按顺序 运行 它们。总是需要并行执行这 3 个任务,而不是任意数量的任务(例如算法)。每个任务可能比其他任务花费的时间略多或少一些,但通常差异不会太大 (20-30%)。
因为他们需要 return 结果,所以我从 :
查看了诸如此类的执行器服务解决方案
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() {
return 2;
}
};
Future<Integer> future = executor.submit(callable);
// future.get() returns 2
executor.shutdown();
由于我使用 Java 的经验主要是在 servlet/JSP 开发中,所以我没有使用线程的经验,也不确定该代码段是否适用于 3 个任务而不是一个。
我如何提交 3 个任务,每个任务都有自己的参数值,并等待所有任务 return 计算结果,同时 确保为它们创建线程不会否定 运行 自身 CPUs 的优势,即有没有办法在 while()
循环开始之前创建一次线程,然后简单地推一个新的 paramN
进入循环内的每个线程,唤醒它们,并等待它们执行所有计算?
Executors.newSingleThreadExecutor()
将只创建一个线程。你要的是Executors.newFixedThreadPool(3)
。在 while 循环之前调用它,因此线程只创建一次。
创建可调用包装器:
class MyCallable implements Callable<V> {
P p;
MyCallable(P parameter) {
p = parameter;
}
V call() {
return CalcResult(p);
}
}
while 循环:
ExecutorService executor = Executors.newFixedThreadPool(3);
while (cond) {
Future<V> aFuture = executor.submit(new MyCallable(param1));
Future<V> bFuture = executor.submit(new MyCallable(param2));
Future<V> cFuture = executor.submit(new MyCallable(param3));
// this will block until all calculations are finished:
a = aFuture.get();
b = bFuture.get();
c = cFuture.get();
// do something with a/b/c, e.g. calc new params.
}
您可以为您的应用程序创建一个执行程序服务,专门用于处理这些计算。池大小可能会有所不同,具体取决于您是否要同时进行 运行 多项计算:
ExecutorService service = Executors.newFixedThreadPool(3);
Future<Integer> submitA = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return processA();
}
});
Future<Integer> submitB = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return processB();
}
});
Future<Integer> submitC = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return processC();
}
});
int result = submitA.get() + submitB.get() + submitC.get();
在这种情况下,您需要确保您没有在每次 运行 计算时都创建线程池,在这种情况下,与保存相比,创建线程池的影响较小 -假设运行宁时间将通过拆分任务来减少。
有 Java 代码,简化后如下所示:
while(someCondition)
{
SomeType a = CalcResult(param1);
SomeType b = CalcResult(param2);
SomeType c = CalcResult(param3);
// Do something with a, b and c
}
CalcResult()
很费时间。该应用程序在 SMP 系统上运行。有一种推动力同时尝试 运行 所有三个计算,每个单独 CPU 而不是按顺序 运行 它们。总是需要并行执行这 3 个任务,而不是任意数量的任务(例如算法)。每个任务可能比其他任务花费的时间略多或少一些,但通常差异不会太大 (20-30%)。
因为他们需要 return 结果,所以我从 :
查看了诸如此类的执行器服务解决方案ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() {
return 2;
}
};
Future<Integer> future = executor.submit(callable);
// future.get() returns 2
executor.shutdown();
由于我使用 Java 的经验主要是在 servlet/JSP 开发中,所以我没有使用线程的经验,也不确定该代码段是否适用于 3 个任务而不是一个。
我如何提交 3 个任务,每个任务都有自己的参数值,并等待所有任务 return 计算结果,同时 确保为它们创建线程不会否定 运行 自身 CPUs 的优势,即有没有办法在 while()
循环开始之前创建一次线程,然后简单地推一个新的 paramN
进入循环内的每个线程,唤醒它们,并等待它们执行所有计算?
Executors.newSingleThreadExecutor()
将只创建一个线程。你要的是Executors.newFixedThreadPool(3)
。在 while 循环之前调用它,因此线程只创建一次。
创建可调用包装器:
class MyCallable implements Callable<V> {
P p;
MyCallable(P parameter) {
p = parameter;
}
V call() {
return CalcResult(p);
}
}
while 循环:
ExecutorService executor = Executors.newFixedThreadPool(3);
while (cond) {
Future<V> aFuture = executor.submit(new MyCallable(param1));
Future<V> bFuture = executor.submit(new MyCallable(param2));
Future<V> cFuture = executor.submit(new MyCallable(param3));
// this will block until all calculations are finished:
a = aFuture.get();
b = bFuture.get();
c = cFuture.get();
// do something with a/b/c, e.g. calc new params.
}
您可以为您的应用程序创建一个执行程序服务,专门用于处理这些计算。池大小可能会有所不同,具体取决于您是否要同时进行 运行 多项计算:
ExecutorService service = Executors.newFixedThreadPool(3);
Future<Integer> submitA = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return processA();
}
});
Future<Integer> submitB = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return processB();
}
});
Future<Integer> submitC = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return processC();
}
});
int result = submitA.get() + submitB.get() + submitC.get();
在这种情况下,您需要确保您没有在每次 运行 计算时都创建线程池,在这种情况下,与保存相比,创建线程池的影响较小 -假设运行宁时间将通过拆分任务来减少。