将工作负载动态分配给 Java 中的多个线程
Dynamically distributing workload to multiple threads in Java
假设我有 5 个线程,它们必须对并行 Monte Carlo 方法程序进行总共 1,000,000
次函数调用。我为 5 个线程中的每一个分配了 1,000,000 / 5
函数调用。然而,经过多次测试(有些测试的迭代次数高达 1 万亿次),我意识到某些线程的完成速度比其他线程快得多。因此,我想为每个线程动态分配工作负载。我的第一个方法涉及一个 AtomicLong
变量,它被设置为初始值,比方说,10 亿。在每次函数调用后,我会将 AtomicLong
减 1。在每次函数调用之前,程序会检查 AtomicLong
是否大于 0
,如下所示:
AtomicLong remainingIterations = new AtomicLong(1000000000);
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {//create 5 threads
threadPool.submit(new Runnable() {
public void run() {
while (remainingIterations.get() > 0) {//do a function call if necessary
remainingIterations.decrementAndGet();//decrement # of remaining calls needed
doOneFunctionCall();//perform a function call
}
}
});
}//more unrelated code is not show (thread shutdown, etc.)
这种方法似乎非常慢,我是否正确使用了 AtomicLong?有没有更好的方法?
am I using AtomicLong correctly?
不完全是。您使用它的方式,两个线程可以每个检查 remainingIterations
,每个线程都可以看到 1
,然后每个线程递减它,使您总共 -1
。
至于你的缓慢问题,如果 doOneFunctionCall()
快速完成,你的应用程序可能会因围绕你的 AtomicLong 的锁争用而陷入困境。
ExecutorService 的好处在于它在逻辑上将正在完成的工作与正在执行它的线程分离。您可以提交比线程更多的作业,ExecutorService 将在有能力时尽快执行它们:
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 1000000; i++) {
threadPool.submit(new Runnable() {
public void run() {
doOneFunctionCall();
}
});
}
这可能会在另一个方向上过度平衡您的工作:创建太多短暂的 Runnable 对象。您可以尝试看看如何在分配工作和快速执行工作之间取得最佳平衡:
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 1000; i++) {
threadPool.submit(new Runnable() {
public void run() {
for (int j = 0; j < 1000; j++) {
doOneFunctionCall();
}
}
});
}
看看 ForkJoinPool。您正在尝试的是分而治之。在 F/J 中,您将线程数设置为 5。每个线程都有一个待处理任务队列。您可以为每个 thread/queue 均匀设置任务数,当一个线程用完工作时,它会从另一个线程的队列中窃取工作。这样你就不需要 AtomicLong.
有很多使用这个的例子Class。如果您需要更多信息,请告诉我。
避免创建 1B 任务的一种优雅方法是使用同步队列和 ThreadPoolExecutor,这样做提交将被阻塞,直到线程可用。
不过我没有测试实际性能。
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService threadPool = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
queue);
for (int i = 0; i < 1000000000; i++) {
threadPool.submit(new Runnable() {
public void run() {
doOneFunctionCall();
}
});
}
假设我有 5 个线程,它们必须对并行 Monte Carlo 方法程序进行总共 1,000,000
次函数调用。我为 5 个线程中的每一个分配了 1,000,000 / 5
函数调用。然而,经过多次测试(有些测试的迭代次数高达 1 万亿次),我意识到某些线程的完成速度比其他线程快得多。因此,我想为每个线程动态分配工作负载。我的第一个方法涉及一个 AtomicLong
变量,它被设置为初始值,比方说,10 亿。在每次函数调用后,我会将 AtomicLong
减 1。在每次函数调用之前,程序会检查 AtomicLong
是否大于 0
,如下所示:
AtomicLong remainingIterations = new AtomicLong(1000000000);
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {//create 5 threads
threadPool.submit(new Runnable() {
public void run() {
while (remainingIterations.get() > 0) {//do a function call if necessary
remainingIterations.decrementAndGet();//decrement # of remaining calls needed
doOneFunctionCall();//perform a function call
}
}
});
}//more unrelated code is not show (thread shutdown, etc.)
这种方法似乎非常慢,我是否正确使用了 AtomicLong?有没有更好的方法?
am I using AtomicLong correctly?
不完全是。您使用它的方式,两个线程可以每个检查 remainingIterations
,每个线程都可以看到 1
,然后每个线程递减它,使您总共 -1
。
至于你的缓慢问题,如果 doOneFunctionCall()
快速完成,你的应用程序可能会因围绕你的 AtomicLong 的锁争用而陷入困境。
ExecutorService 的好处在于它在逻辑上将正在完成的工作与正在执行它的线程分离。您可以提交比线程更多的作业,ExecutorService 将在有能力时尽快执行它们:
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 1000000; i++) {
threadPool.submit(new Runnable() {
public void run() {
doOneFunctionCall();
}
});
}
这可能会在另一个方向上过度平衡您的工作:创建太多短暂的 Runnable 对象。您可以尝试看看如何在分配工作和快速执行工作之间取得最佳平衡:
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 1000; i++) {
threadPool.submit(new Runnable() {
public void run() {
for (int j = 0; j < 1000; j++) {
doOneFunctionCall();
}
}
});
}
看看 ForkJoinPool。您正在尝试的是分而治之。在 F/J 中,您将线程数设置为 5。每个线程都有一个待处理任务队列。您可以为每个 thread/queue 均匀设置任务数,当一个线程用完工作时,它会从另一个线程的队列中窃取工作。这样你就不需要 AtomicLong.
有很多使用这个的例子Class。如果您需要更多信息,请告诉我。
避免创建 1B 任务的一种优雅方法是使用同步队列和 ThreadPoolExecutor,这样做提交将被阻塞,直到线程可用。 不过我没有测试实际性能。
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService threadPool = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
queue);
for (int i = 0; i < 1000000000; i++) {
threadPool.submit(new Runnable() {
public void run() {
doOneFunctionCall();
}
});
}