将工作负载动态分配给 Java 中的多个线程

Dynamically distributing workload to multiple threads in Java

假设我有 5 个线程,它们必须对并行 Monte Carlo 方法程序进行总共 1,000,000 次函数调用。我为 5 个线程中的每一个分配了 1,000,000 / 5 函数调用。然而,经过多次测试(有些测试的迭代次数高达 1 万亿次),我意识到某些线程的完成速度比其他线程快得多。因此,我想为每个线程动态分配工作负载。我的第一个方法涉及一个 AtomicLong 变量,它被设置为初始值,比方说,10 亿。在每次函数调用后,我会将 AtomicLong 减 1。在每次函数调用之前,程序会检查 AtomicLong 是否大于 0,如下所示:

AtomicLong remainingIterations = new AtomicLong(1000000000);
ExecutorService threadPool = Executors.newFixedThreadPool(5);

for (int i = 0; i < 5; i++) {//create 5 threads
   threadPool.submit(new Runnable() {
       public void run() {
          while (remainingIterations.get() > 0) {//do a function call if necessary 
             remainingIterations.decrementAndGet();//decrement # of remaining calls needed
             doOneFunctionCall();//perform a function call
          }
       }
   });
}//more unrelated code is not show (thread shutdown, etc.)

这种方法似乎非常慢,我是否正确使用了 AtomicLong?有没有更好的方法?

am I using AtomicLong correctly?

不完全是。您使用它的方式,两个线程可以每个检查 remainingIterations,每个线程都可以看到 1,然后每个线程递减它,使您总共 -1

至于你的缓慢问题,如果 doOneFunctionCall() 快速完成,你的应用程序可能会因围绕你的 AtomicLong 的锁争用而陷入困境。

ExecutorService 的好处在于它在逻辑上将正在完成的工作与正在执行它的线程分离。您可以提交比线程更多的作业,ExecutorService 将在有能力时尽快执行它们:

ExecutorService threadPool = Executors.newFixedThreadPool(5);

for (int i = 0; i < 1000000; i++) {
   threadPool.submit(new Runnable() {
       public void run() {
           doOneFunctionCall();
       }
   });
}

这可能会在另一个方向上过度平衡您的工作:创建太多短暂的 Runnable 对象。您可以尝试看看如何在分配工作和快速执行工作之间取得最佳平衡:

ExecutorService threadPool = Executors.newFixedThreadPool(5);

for (int i = 0; i < 1000; i++) {
   threadPool.submit(new Runnable() {
       public void run() {
           for (int j = 0; j < 1000; j++) {
               doOneFunctionCall();
           }
       }
   });
}

看看 ForkJoinPool。您正在尝试的是分而治之。在 F/J 中,您将线程数设置为 5。每个线程都有一个待处理任务队列。您可以为每个 thread/queue 均匀设置任务数,当一个线程用完工作时,它会从另一个线程的队列中窃取工作。这样你就不需要 AtomicLong.

有很多使用这个的例子Class。如果您需要更多信息,请告诉我。

避免创建 1B 任务的一种优雅方法是使用同步队列和 ThreadPoolExecutor,这样做提交将被阻塞,直到线程可用。 不过我没有测试实际性能。

BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService threadPool = new ThreadPoolExecutor(5, 5,
    0L, TimeUnit.MILLISECONDS,
    queue);
for (int i = 0; i < 1000000000; i++) {
    threadPool.submit(new Runnable() {
        public void run() {
            doOneFunctionCall();
        }
    });
}