同时求和的最佳方法
Best way to sum concurrently
我正在尝试计算一些大数字。为了加快计算速度,我想使用多线程。每个线程都要算一个数,最后算一个和。
我曾经看到过使用 SumThread
和 Collector
的东西,如下所示:
public BigInteger compute(int p) {
Collector c = new Collector(p);
for(T element : Collection<T> bigCollection) {
new SumThread(c) {
@Override
protected void doTheJob() {
long big = someVeryComplexCalculation(element, ...); //n!
receive(BigInteger.valueOf(big));
}
}
}
if(collector.isReady())
return collector.getResult();
return null;
}
public class Collector {
private int numberOfProcesses;
private int numberOfAllowedProcesses;
private BigInteger result;
public Collector(int n) {
numberOfAllowedProcesses = n;
numberOfProcesses = 0;
result = BigInteger.ZERO;
}
synchronized public void enter() throws InterruptedException {
if (numberOfProcesses == numberOfAllowedProcesses) wait();
numberOfProcesses++;
}
synchronized public void leave() {
numberOfProcesses--;
notify();
}
synchronized public void register(BigInteger v) {
result = result.add(v);
}
synchronized public boolean isReady() throws InterruptedException {
while (numberOfProcesses > 0) wait();
return true;
}
...
}
public abstract class SumThread extends Thread {
private Collector collector;
public SumThread(Collector c) throws InterruptedException {
collector = c;
collector.enter();
}
abstract protected void doTheJob(); //complex calculations can be done in here
public void receive(BigInteger t) {
collector.register(t);
}
public void run() {
doTheJob();
collector.leave();
}
}
我认为我可以通过使用 ExecutorService
而不是不断地创建新的 Thread
来轻松地超越它,例如:
public BigInteger compute(int p) {
ExecutorService pool = Executors.newFixedThreadPool(p);
Future<BigInteger>[] futures = new Future<BigInteger>[bigCollection.size()];
int i = 0;
for(T element : Collection<T> bigCollection) {
futures[i++] = p.submit(new Callable<BigInteger>() {
@Override
public BigInteger call() {
long big = someVeryComplexCalculation(element, ...); //n!
return BigInteger.valueOf(big);
}
}
}
// or with ExecutorCompletionService, but the loop remains I guess
BigInteger res = BigInteger.ZERO
for(Future<BigInteger> f : futures)
res = res.add(f.get());
return res;
}
然而,此代码的性能并没有优于 SumThread
-Collector
解决方案。例如,我也看到了关于 LongAdder
的事情,但是我需要一些加法器来 BigInteger
s...
因此我的问题是:同时计算总和的最佳方法是什么?是以上之一还是有完全不同(但更好)的方式?
正如您提到的 LongAdder
添加到 Java-8 中并使用 effectively-final 变量,我假设您使用的是 Java-8。在此版本中,解决您的任务的最佳方法是使用 Stream API:
BigInteger result = bigCollection.parallelStream()
.map(e -> BigInteger.valueOf(someVeryComplexCalculation(e, ...)))
.reduce(BigInteger.ZERO, BigInteger::add);
您的问题是经典的 map-reduce 任务,您应该转换某个集合的每个元素,然后将各个转换的结果组合成最终结果。 Stream API 能够非常有效地并行处理此类任务,而无需任何手动工作。在 Oracle JDK 中,任务在 common ForkJoinPool pool 中执行,默认情况下创建与您拥有的 CPU 个内核一样多的线程。
你有两个解决方案:
首先,我建议使用 JDK7 中的 Fork-Join Framework 来完成此任务:
- https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
- http://www.javacodegeeks.com/2013/02/java-7-forkjoin-framework-example.html
您需要实施 RecursiveTask
第二种解决方案 (JDK8) 是使用 pararell 流,正如@tagir-valeev 所提议的那样。
在这两种情况下,这取决于您的目的,以及您使用的 Java 版本。
我正在尝试计算一些大数字。为了加快计算速度,我想使用多线程。每个线程都要算一个数,最后算一个和。
我曾经看到过使用 SumThread
和 Collector
的东西,如下所示:
public BigInteger compute(int p) {
Collector c = new Collector(p);
for(T element : Collection<T> bigCollection) {
new SumThread(c) {
@Override
protected void doTheJob() {
long big = someVeryComplexCalculation(element, ...); //n!
receive(BigInteger.valueOf(big));
}
}
}
if(collector.isReady())
return collector.getResult();
return null;
}
public class Collector {
private int numberOfProcesses;
private int numberOfAllowedProcesses;
private BigInteger result;
public Collector(int n) {
numberOfAllowedProcesses = n;
numberOfProcesses = 0;
result = BigInteger.ZERO;
}
synchronized public void enter() throws InterruptedException {
if (numberOfProcesses == numberOfAllowedProcesses) wait();
numberOfProcesses++;
}
synchronized public void leave() {
numberOfProcesses--;
notify();
}
synchronized public void register(BigInteger v) {
result = result.add(v);
}
synchronized public boolean isReady() throws InterruptedException {
while (numberOfProcesses > 0) wait();
return true;
}
...
}
public abstract class SumThread extends Thread {
private Collector collector;
public SumThread(Collector c) throws InterruptedException {
collector = c;
collector.enter();
}
abstract protected void doTheJob(); //complex calculations can be done in here
public void receive(BigInteger t) {
collector.register(t);
}
public void run() {
doTheJob();
collector.leave();
}
}
我认为我可以通过使用 ExecutorService
而不是不断地创建新的 Thread
来轻松地超越它,例如:
public BigInteger compute(int p) {
ExecutorService pool = Executors.newFixedThreadPool(p);
Future<BigInteger>[] futures = new Future<BigInteger>[bigCollection.size()];
int i = 0;
for(T element : Collection<T> bigCollection) {
futures[i++] = p.submit(new Callable<BigInteger>() {
@Override
public BigInteger call() {
long big = someVeryComplexCalculation(element, ...); //n!
return BigInteger.valueOf(big);
}
}
}
// or with ExecutorCompletionService, but the loop remains I guess
BigInteger res = BigInteger.ZERO
for(Future<BigInteger> f : futures)
res = res.add(f.get());
return res;
}
然而,此代码的性能并没有优于 SumThread
-Collector
解决方案。例如,我也看到了关于 LongAdder
的事情,但是我需要一些加法器来 BigInteger
s...
因此我的问题是:同时计算总和的最佳方法是什么?是以上之一还是有完全不同(但更好)的方式?
正如您提到的 LongAdder
添加到 Java-8 中并使用 effectively-final 变量,我假设您使用的是 Java-8。在此版本中,解决您的任务的最佳方法是使用 Stream API:
BigInteger result = bigCollection.parallelStream()
.map(e -> BigInteger.valueOf(someVeryComplexCalculation(e, ...)))
.reduce(BigInteger.ZERO, BigInteger::add);
您的问题是经典的 map-reduce 任务,您应该转换某个集合的每个元素,然后将各个转换的结果组合成最终结果。 Stream API 能够非常有效地并行处理此类任务,而无需任何手动工作。在 Oracle JDK 中,任务在 common ForkJoinPool pool 中执行,默认情况下创建与您拥有的 CPU 个内核一样多的线程。
你有两个解决方案:
首先,我建议使用 JDK7 中的 Fork-Join Framework 来完成此任务:
- https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
- http://www.javacodegeeks.com/2013/02/java-7-forkjoin-framework-example.html
您需要实施 RecursiveTask
第二种解决方案 (JDK8) 是使用 pararell 流,正如@tagir-valeev 所提议的那样。
在这两种情况下,这取决于您的目的,以及您使用的 Java 版本。