同时求和的最佳方法

Best way to sum concurrently

我正在尝试计算一些大数字。为了加快计算速度,我想使用多线程。每个线程都要算一个数,最后算一个和。

我曾经看到过使用 SumThreadCollector 的东西,如下所示:

public BigInteger compute(int p) {
    Collector c = new Collector(p);

    for(T element : Collection<T> bigCollection) {
        new SumThread(c) {

            @Override
            protected void doTheJob() {
                long big = someVeryComplexCalculation(element, ...); //n!
                receive(BigInteger.valueOf(big));
            }

        }
    }

    if(collector.isReady())
        return collector.getResult();

    return null;
}

public class Collector {

    private int numberOfProcesses;
    private int numberOfAllowedProcesses;
    private BigInteger result;

    public Collector(int n) {
        numberOfAllowedProcesses = n;
        numberOfProcesses = 0;
        result = BigInteger.ZERO;
    }

    synchronized public void enter() throws InterruptedException {
        if (numberOfProcesses == numberOfAllowedProcesses) wait();
        numberOfProcesses++;
    }

    synchronized public void leave() {
        numberOfProcesses--;
        notify();
    }

    synchronized public void register(BigInteger v) {
        result = result.add(v);
    }

    synchronized public boolean isReady() throws InterruptedException {
        while (numberOfProcesses > 0) wait();
        return true;
    }

    ...
}

public abstract class SumThread extends Thread {

    private Collector collector;

    public SumThread(Collector c) throws InterruptedException {
        collector = c;
        collector.enter();
    }

    abstract protected void doTheJob(); //complex calculations can be done in here

    public void receive(BigInteger t) {
        collector.register(t);
    }

    public void run() {
        doTheJob();
        collector.leave();
    }
}

我认为我可以通过使用 ExecutorService 而不是不断地创建新的 Thread 来轻松地超越它,例如:

public BigInteger compute(int p) {
    ExecutorService pool = Executors.newFixedThreadPool(p);
    Future<BigInteger>[] futures = new Future<BigInteger>[bigCollection.size()];
    int i = 0;

    for(T element : Collection<T> bigCollection) {
        futures[i++] = p.submit(new Callable<BigInteger>() {

            @Override
            public BigInteger call() {
                long big = someVeryComplexCalculation(element, ...); //n!
                return BigInteger.valueOf(big);
            }

        }
    }

    // or with ExecutorCompletionService, but the loop remains I guess
    BigInteger res = BigInteger.ZERO
    for(Future<BigInteger> f : futures)
        res = res.add(f.get());

    return res;
}

然而,此代码的性能并没有优于 SumThread-Collector 解决方案。例如,我也看到了关于 LongAdder 的事情,但是我需要一些加法器来 BigIntegers...

因此我的问题是:同时计算总和的最佳方法是什么?是以上之一还是有完全不同(但更好)的方式?

正如您提到的 LongAdder 添加到 Java-8 中并使用 effectively-final 变量,我假设您使用的是 Java-8。在此版本中,解决您的任务的最佳方法是使用 Stream API:

BigInteger result = bigCollection.parallelStream()
                     .map(e -> BigInteger.valueOf(someVeryComplexCalculation(e, ...)))
                     .reduce(BigInteger.ZERO, BigInteger::add);

您的问题是经典的 map-reduce 任务,您应该转换某个集合的每个元素,然后将各个转换的结果组合成最终结果。 Stream API 能够非常有效地并行处理此类任务,而无需任何手动工作。在 Oracle JDK 中,任务在 common ForkJoinPool pool 中执行,默认情况下创建与您拥有的 CPU 个内核一样多的线程。

你有两个解决方案:

首先,我建议使用 JDK7 中的 Fork-Join Framework 来完成此任务:

您需要实施 RecursiveTask

第二种解决方案 (JDK8) 是使用 pararell 流,正如@tagir-valeev 所提议的那样。

在这两种情况下,这取决于您的目的,以及您使用的 Java 版本。