std::shared_ptr 向量指向的数据之间的线程同步

Thread synchronization between data pointed by vectors of std::shared_ptr

我是并发编程的新手,我有一个特定的问题,我无法通过浏览互联网找到解决方案..

基本上我有这种情况(示意图伪代码):

void fun1(std::vector<std::shared_ptr<SmallObj>>& v) {
  for(int i=0; i<v.size(); i++)
    .. read and write on *v[i] ..
}

void fun2(std::vector<std::shared_ptr<SmallObj>>& w) {
  for(int i=0; i<w.size(); i++)
    .. just read on *w[i] ..
}


int main() {

 std::vector<std::shared_ptr<SmallObj>> tot;

 for(int iter=0; iter<iterMax; iter++) {
   for(int nObj=0; nObj<nObjMax; nObj++)
      .. create a SmallObj in the heap and store a shared_ptr in tot ..

   std::vector<std::shared_ptr<SmallObj>> v, w;
   .. copy elements of "tot" in v and w ..

   fun1(v);
   fun2(w);
 }

 return 0;
}

我想做的是同时运行产生两个线程来执行 fun1 和 fun2,但我需要使用某种锁定机制来调节对 SmallObjs 的访问。我该怎么做?在文献中,我只能找到使用互斥锁来锁定对特定对象或部分代码的访问的示例,但不能找到不同对象(在本例中为 v 和 w)的相同指向变量。

非常感谢,抱歉我对此事的无知..

I need to regulate the access to the SmallObjs using some locking mechanism. How can I do it?

为您的数据成员使用 getter 和 setter。使用 std::mutex(或 std::recursive_mutex 取决于是否需要递归锁定)数据成员来保护访问,然后始终使用锁保护器锁定。

示例(另见代码中的注释):

class SmallObject{

    int getID() const{
        std::lock_guard<std::mutex> lck(m_mutex);
        return ....;
    }


    void setID(int id){
        std::lock_guard<std::mutex> lck(m_mutex);
        ....;
    }

    MyType calculate() const{
        std::lock_guard<std::mutex> lck(m_mutex);

        //HERE is a GOTCHA if `m_mutex` is a `std::mutex`
        int k = this->getID();               //Ooopsie... Deadlock

        //To do the above, change the decaration of `m_mutex` from
        //std::mutex, to std::recursive_mutex
}

private:
    ..some data
    mutable std::mutex m_mutex;
};

最简单的解决方案是为整个向量保留一个 std::mutex

#include <mutex>
#include <thread>
#include <vector>

void fun1(std::vector<std::shared_ptr<SmallObj>>& v,std::mutex &mtx) {
  for(int i=0; i<v.size(); i++)
    //Anything you can do before read/write of *v[i]...
   {
       std::lock_guard<std::mutex> guard(mtx);
        //read-write *v[i]
   }
   //Anything you can do after read/write of *v[i]...
}

void fun2(std::vector<std::shared_ptr<SmallObj>>& w,std::mutex &mtx) {
  for(int i=0; i<w.size(); i++) {

   //Anything that can happen before reading *w[i] 
   {
       std::lock_guard<std::mutex> guard(mtx);
        //read *w[i]
    }
    //Anything that can happen after reading *w[i]
}


int main() {

 std::mutex mtx;
 std::vector<std::shared_ptr<SmallObj>> tot;

 for(int iter=0; iter<iterMax; iter++) {
   for(int nObj=0; nObj<nObjMax; nObj++)
      .. create a SmallObj in the heap and store a shared_ptr in tot ..

   std::vector<std::shared_ptr<SmallObj>> v, w;
   .. copy elements of "tot" in v and w ..

   std::thread t1([&v,&mtx] { fun1(v,mtx); });
   std::thread t2([&w,&mtx] { fun2(w,mtx); });

   t1.join();
   t2.join();
 }

 return 0;
}

然而,在 fun1()fun2().

的循环中,您实际上只会在 before/after 块中完成的位上获得任何并行性

您可以通过引入更多锁来进一步提高并行度。 例如,你也许可以只使用 2 个控制奇数和偶数元素的互斥锁:

void fun1(std::vector<int>&v,std::mutex& mtx0,std::mutex& mtx1 ){
    for(size_t i{0};i<v.size();++i){
        {
        std::lock_guard<std::mutex> guard(i%2==0?mtx0:mtx1);
            //read-write *v[i]
        }
    } 
}

fun2() 的格式类似。

您可以通过从向量的两端开始工作或使用 try_lock 并移动到后续元素并在可用时 'coming back' 移动到锁定元素来减少争用。

如果一个函数的迭代执行量比另一个函数大得多,并且在另一个函数完成之前从 'faster' 获取结果有一些优势,那么这可能是最重要的。

备选方案:

显然可以为每个对象添加一个std::mutex。 这是否有效/是否必要将取决于函数 fun1fun2 中实际完成的工作以及这些互斥锁的管理方式。

如果有必要在任何一个循环开始之前进行锁定,实际上并行可能没有任何好处,因为 fun1()fun2() 中的一个实际上会等待另一个完成,而这两个将在系列中生效 运行。