如何让不同的线程一起填充一个数组?

How to let different threads fill an array together?

假设我有一些任务(Monte Carlo 模拟)我想 运行 并行。我想完成给定数量的任务,但任务花费的时间不同,因此不容易在线程上平均分配工作。另外:最后我需要单个向量(或数组)中的所有模拟结果。

所以我想出了以下方法:

 int Max{1000000};
 //SimResult is some struct with well-defined default value.
 std::vector<SimResult> vec(/*length*/Max);//Initialize with default values of SimResult
 int LastAdded{0};
 void fill(int RandSeed)
 { 
      Simulator sim{RandSeed};
      while(LastAdded < Max)
      {
           // Do some work to bring foo to the desired state
           //The duration of this work is subject to randomness
           vec[LastAdded++] 
                 = sim.GetResult();//Produces SimResult. 
      }
 }
 main()
 { 
       //launch a bunch of std::async that start
       auto fut1 = std::async(fill,1);
       auto fut2 = std::async(fill,2);
       //maybe some more tasks.


      fut1.get();
      fut2.get();
      //do something with the results in vec. 
 }

我猜上面的代码会给出竞态条件。我正在寻找一种高效的方法来避免这种情况。要求:避免竞争条件(填满整个数组,不跳过);最终结果立即在数组中;性能。

阅读各种方法后,原子似乎是一个不错的选择,但我不确定哪种设置对我的情况最有效?甚至不确定 atomic 是否会削减它;也许需要一个保护 LastAdded 的互斥体?

由于您已经知道要处理多少个元素并且从不更改向量的大小,因此最简单的解决方案是让每个线程处理向量中它自己的部分。例如

更新

为了适应变化很大的计算时间,您应该保留当前代码,但通过对所有线程都相同的 std::lock_guard. You will need a std::mutex 避免竞争条件,例如全局变量,或传递对每个线程的互斥量。

void fill(int RandSeed, std::mutex &nextItemMutex)
{ 
      Simulator sim{RandSeed};
      size_t workingIndex;
      while(true)
      {
          {
               // enter critical area
               std::lock_guard<std::mutex> nextItemLock(nextItemMutex);

               // Acquire next item
               if(LastAdded < Max)
               {
                   workingIndex = LastAdded;
                   LastAdded++;
               } 
               else 
               {
                   break;
               }
               // lock is released when nextItemLock goes out of scope
          }

           // Do some work to bring foo to the desired state
           // The duration of this work is subject to randomness
           vec[workingIndex] = sim.GetResult();//Produces SimResult. 
      }
 }

问题在于,同步非常昂贵。但与您 运行 的模拟相比,它可能 并不昂贵,所以它应该不会太差。

版本 2:

为了减少所需的同步量,您可以获取要处理的块,而不是单个项目:

void fill(int RandSeed, std::mutex &nextItemMutex, size_t blockSize)
{ 
      Simulator sim{RandSeed};
      size_t workingIndex;
      while(true)
      {
          {
               std::lock_guard<std::mutex> nextItemLock(nextItemMutex);

               if(LastAdded < Max)
               {
                   workingIndex = LastAdded;
                   LastAdded += blockSize;
               } 
               else 
               {
                   break;
               }
          }
          
          for(size_t i = workingIndex; i < workingIndex + blockSize && i < MAX; i++)
              vec[i] = sim.GetResult();//Produces SimResult. 
      }
 }

简单版

void fill(int RandSeed, size_t partitionStart, size_t partitionEnd)
{ 
      Simulator sim{RandSeed};
      for(size_t i = partitionStart; i < partitionEnd; i++)
      {
           // Do some work to bring foo to the desired state
           // The duration of this work is subject to randomness
           vec[i] = sim.GetResult();//Produces SimResult. 
      }
 }
main()
{ 
    //launch a bunch of std::async that start
    auto fut1 = std::async(fill,1, 0, Max / 2);
    auto fut2 = std::async(fill,2, Max / 2, Max);

    // ...
}

我要说的一件事是,您需要非常小心地使用标准库随机数函数。如果你的 'Simulator' class 创建了一个生成器的实例,你不应该 运行 Monte Carlo 使用同一个对象并行模拟,因为你可能会得到重复的模式运行 之间的随机数,这会给你不准确的结果。

这方面的最佳做法是创建 N 个具有相同属性的模拟器对象,并为每个对象提供不同的随机种子。然后,您可以使用 OpenMP 将这些对象集中到多个线程中,OpenMP 是一种用于科学软件开发的通用并行编程模型。

std::vector<SimResult> generateResults(size_t N_runs, double seed) 
{
    std::vector<SimResult> results(N_runs);
    #pragma omp parallel for
    for(auto i = 0; i < N_runs; i++)
    {
        auto sim = Simulator(seed + i);
        results[i] = sim.GetResult();
    }
}

编辑:使用 OpenMP,您可以选择不同的调度模型,例如在线程之间动态拆分工作。你可以这样做:

#pragma omp parallel for schedule(dynamic, 16)

这将为每个线程提供 16 个项目的块以供一次处理。