使循环多线程变得昂贵,Java

Make expensive for loop multithreaded, Java

我有一个问题想用 Java 的 ExecutorServiceFuture class 来解决。我目前正在使用 for 循环从一个对我来说计算成本非常高的函数中获取许多样本(每个样本可能需要几分钟)。我有一个 class FunctionEvaluator 为我评估这个函数,这个 class 实例化起来非常昂贵,因为它包含很多内部存储器,所以我做了这个 class 可以通过一些内部计数器和 reset() 方法轻松重用。所以我现在的情况是这样的:

int numSamples = 100;
int amountOfData = 1000000;
double[] data = new double[amountOfData];//Data comes from somewhere...
double[] results = new double[numSamples];
//a lot of memory contained inside the FunctionEvaluator class,
//expensive to intialise
FunctionEvaluator fe = new FunctionEvaluator();

for(int i=0; i<numSamples; i++) {
    results[i] = fe.sampleAt(i, data);//very expensive computation
}

但我想要一些多线程来加快速度。它应该很简单,因为虽然每个样本都将共享 data 中的任何内容,但它是一个只读操作并且每个样本都独立于任何其他样本。现在我不会遇到任何问题,因为我以前使用过 Java 的 FutureExecutorService,但从来没有在 Callable 必须被重新使用。所以总的来说,鉴于我可以负担 运行 n FunctionEvaluator 的实例化,我将如何设置这个场景?像这样(非常粗略)的东西:

int numSamples = 100;
int amountOfData = 1000000;
int N = 10;

double[] data = new double[amountOfData];//Data comes from somewhere...
double[] results = new double[numSamples];
//a lot of memory contained inside the FunctionEvaluator class,
//expensive to intialise
FunctionEvaluator[] fe = new FunctionEvaluator[N];

for(int i=0; i<numSamples; i++) {
    //Somehow add available FunctionEvaluators to an ExecutorService
    //so that N FunctionEvaluators can run in parallel. When a 
    //FunctionEvaluator is finished, reset then compute a new sample
    //until numSamples samples have been taken.
}

如有任何帮助,我们将不胜感激!非常感谢。

编辑

所以这是一个玩具示例(不起作用 :P)。在这种情况下,我想要采样的 "expensive function" 只是整数的平方,而为我做的 "expensive to instantiate class" 称为 CallableComputation:

在TestConc.java中:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class TestConc {

    public static void main(String[] args) {
        SquareCalculator squareCalculator = new SquareCalculator();
        int numFunctionEvaluators = 2;
        int numSamples = 10;

        ExecutorService executor = Executors.newFixedThreadPool(2);
        CallableComputation c1 = new CallableComputation(2);
        CallableComputation c2 = new CallableComputation(3);

        CallableComputation[] callables = new CallableComputation[numFunctionEvaluators];
        Future<Integer>[] futures = (new Future[numFunctionEvaluators]);
        int[] results = new int[numSamples];

        for(int i=0; i<numFunctionEvaluators; i++) {
            callables[i] = new CallableComputation(i);
            futures[i] = executor.submit(callables[i]);
        }

        futures[0] = executor.submit(c1);
        futures[1] = executor.submit(c2);

        for(int i=numFunctionEvaluators; i<numSamples; ) {
            for(int j=0; j<futures.length; j++) {
                if(futures[j].isDone()) {
                    try {
                        results[i] = futures[j].get();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                    callables[j].set(i);
                    System.out.printf("Function evaluator %d given %d\n", j, i+1);
                    executor.submit(callables[j]);
                    i++;
                }
            }
        }
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i=0; i<results.length; i++) {
            System.out.printf("res%d=%d, ", i, results[i]);
        }
        System.out.println();
    }

    private static boolean areDone(Future<Integer>[] futures) {
        for(int i=0; i<futures.length; i++) {
            if(!futures[i].isDone()) {
                return false;
            }
        }
        return true;
    }

    private static void printFutures(Future<Integer>[] futures) {
        for (int i=0; i<futures.length; i++) {
            System.out.printf("f%d=%s | ", i, futures[i].isDone()?"done" : "not done");
        }System.out.printf("\n");
    }

}

在CallableComputation.java中:

import java.util.concurrent.Callable;

public class CallableComputation implements Callable<Integer>{

    int input = 0;

    public CallableComputation(int input) {
        this.input = input;
    }

    public void set(int i) {
        input = i;
    }

    @Override
    public Integer call() throws Exception {
        System.out.printf("currval=%d\n", input);
        Thread.sleep(500);
        return input * input;
    }
}

不需要特殊的代码更改,您可以一次又一次地使用同一个 Callable 而不会出现任何问题。另外,为了提高效率,正如你所说,创建一个 FunctionEvaluator 的实例是昂贵的,你可以只使用一个实例并确保 sampleAt 是线程安全的。一种选择是,也许您可​​以使用所有函数局部变量,并且在任何线程处于 运行

的任何时间点都不要修改任何传递参数

请在下面找到一个简单的例子:

代码片段:

ExecutorService executor = Executors.newFixedThreadPool(2);
Callable<String> task1 = new Callable<String>(){public String call(){System.out.println(Thread.currentThread()+"currentThread");return null;}}
executor.submit(task1);
executor.submit(task1);
executor.shutdown();

请查看以下屏幕截图:

你可以将每个Function Evaluator的实际工作包装成一个Callable/Runnanle,然后使用一个带队列的fixedThreadPool,然后你只需要将目标callable/runnable提交给threadPool。

在Java8中:

double[] result = IntStream.range(0, numSamples)
    .parallel()
    .mapToDouble(i->fe.sampleAt(i, data))
    .toArray();

问题询问如何通过加载尽可能多的 CPU 来并行执行繁重的计算函数。

来自 Parallelism tutorial 的力量:

Parallel computing involves dividing a problem into subproblems, solving those problems simultaneously (in parallel, with each subproblem running in a separate thread), and then combining the results of the solutions to the subproblems. Java SE provides the fork/join framework, which enables you to more easily implement parallel computing in your applications. However, with this framework, you must specify how the problems are subdivided (partitioned). With aggregate operations, the Java runtime performs this partitioning and combining of solutions for you.

实际解决方案包括:

  • IntStream.range 将生成从 0 到 numSamples 的整数流。

  • parallel() 将拆分流并执行它将全部可用 CPU 在框上。

  • mapToDouble() 将通过应用执行实际工作的 lamba 表达式将整数流转换为双精度流。

  • toArray() 是一个终端操作,它将聚合结果 return 它作为一个数组。

I would like to get some multithreading going to speed things up.

听起来不错,但您的代码过于复杂。 @Pavel 有一个非常简单的 Java 8 解决方案,但即使没有 Java 8 你也可以让它变得容易得多。

您需要做的就是将作业提交到执行程序,然后对返回的每个 Future 调用 get()Callable class 不是必需的,尽管它确实使代码更清晰。但是你当然不需要数组,它们无论如何都是一个坏模式,因为拼写错误很容易产生 out-of-bounds 异常。坚持 collections 或 Java 8 个流。

ExecutorService executor = Executors.newFixedThreadPool(2);
List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
for (int i = 0; i < numSamples; i++ ) {
    // start the jobs running in the background
    futureList.add(executor.subject(new CallableComputation(i));
}
// shutdown executor if done submitting tasks, submitted jobs will keep running
executor.shutdown();
for (Future<Integer> future : futureList) {
    // this will wait for the future to finish, it also throws some exceptions
    Integer result = future.get();
    // add result to a collection or something here
}