如何将元素添加到数组的线程安全函数?

How to thread-safe function that adds elements to array?

我编写了以下接受对象 (element) 的函数,在现有向量 (elements) 中为其创建 space 并通过添加新对象更新向量:

void addElement(const ElementType& element) {
    if (numElements == elements.size()) {
        elements.resize(boost::extents[numElements+1]);
    }

    elements[numElements] = element;

    numElements++;
}

如何让它对 MPI 来说是线程安全的?据我了解,每个线程都知道 elements 的大小,因此我不明白为什么这个函数不是线程安全的。 numElements 在此函数外初始化为零,并且是 elements 向量的大小。

编辑:我正在使用上面写的函数和 mtx 锁定和解锁如下,但最终的 elements 向量仍然只包含第一等级的数据。

    #pragma omp parallel for collapse(3) schedule(static)
    for (long n0 = mgr->startN0; n0 < mgr->startN0 + mgr->localN0; n0++) {
      for (int n1 = 0; n1 < Ν; n1++) {
        for (int n2 = 0; n2 < Ν; n2++) {
          ElementType element;
          std::mutex mtx;
            for(int i=0;i<g_field[n0][n1][n2];i++){
              ... do stuff with element ...
              mtx.lock();
              #pragma omp critical
              addElement(element);
              mtx.unlock();}
       }
     }
   }

编辑:出于速度原因,我不得不稍微更改函数及其使用:

void addElements_MPI(std::vector<ElementType> new_batch,std::mutex& mtx) {

  std::lock_guard<std::mutex> lock(mtx); 
  elements.resize(boost::extents[elements.num_elements()+new_batch.size()]); 

  std::copy(new_batch.begin(), new_batch.end(), elements.begin()+numElements); 

  numElements += new_batch.size();
}
std::mutex mtx;
std::vector<ElementType> all_elements;
#pragma omp parallel for collapse(3) schedule(static)
    for (long n0 = mgr->startN0; n0 < mgr->startN0 + mgr->localN0; n0++) {
      for (int n1 = 0; n1 < Ν; n1++) {
        for (int n2 = 0; n2 < Ν; n2++) {

          ElementType element;

            for(int i=0;i<g_field[n0][n1][n2];i++){
              ... do stuff with element ...
              mtx.lock();
              #pragma omp critical
              all_elements.push_back(element);
              mtx.unlock();}
       }
     }
   }
 mtx.lock();
 #pragma omp critical
 addElement_MPI(all_elements,mtx);
 mtx.unlock();

首先,让我们看看数据竞争何时发生:

  • 两个或多个线程同时访问同一内存位置
  • 至少有一个访问是为了写
  • 线程没有使用任何独占锁来控制它们对该内存的访问

在您的情况下,一个线程可以添加元素,而另一个线程正在读取 elements。此外,多个线程可以同时添加新元素。要控制他们的访问,您应该锁定共享资源,即 elements.

如何在 C++ 中锁定共享资源?

在 C++ 中,您可以使用 std::mutex 来保护共享数据不被多个线程同时访问。当 std::mutex 被一个线程锁定时,其他线程无法访问共享资源。一旦 std::mutex 解锁,其他线程就可以访问该资源。您可以像这样使用 std::mutex::lockstd::mutex::unlock

std::mutex mtx;
ElementsType elements;

...
mtx.lock();

// only one thread accesses this part at a time
// work with elements

mtx.unlock();

开发人员经常忘记一些事情...

因为开发人员经常忘记一些事情...比如解锁被锁定的 std::mutex...C++ 提供 std::lock_guard (in case of C++11) and std::scoped_lock(在 C++17 的情况下)。您可以将 std::lock_guardstd::scoped_lock 视为某种包装器,它们将 std::mutex 实例作为构造函数的参数并将该 std::mutex 实例锁定在构造函数中。当std::lock_guardstd::scoped_lock个实例被销毁时,std::mutex个实例会自动解锁。

void addElement(const ElementType& element) {
    std::lock_guard<std::mutex> lock(mtx); // similar to mtx.lock()

    if (numElements == elements.size()) {
        elements.resize(boost::extents[numElements+1]);
    }

    elements[numElements] = element;

    numElements++;

    // no need for mtx.unlock() since lock instance is now destructed and mutex is automatically unlocked
}

编辑

由于 std::mutex 是所有线程之间的某种通信通道,所有线程应该共享相同的 std::mutex 实例,即相同的 std::mutex 实例应该对所有线程可见线程。

在下面的情况下(我从更新的问题中提取):

for (int n2 = 0; n2 < Ν; n2++) {
    ElementType element;
    std::mutex mtx;
    for(int i=0;i<g_field[n0][n1][n2];i++){
         // ... do stuff with element ...
         mtx.lock();
         #pragma omp critical
         addElement(element);
         mtx.unlock();}
    }
}

std::mutex 被创建 N 次,每个线程创建自己的 std::mutex 实例,这没有任何意义,因为它们无法相互通信。相反,一个 std::mutex 实例应该对所有线程可见,就像 elements 对所有线程可见一样。