通过在共享模式下获取互斥锁来写入共享变量(而不是在独占模式下)

Writing on shared variable by acquiring mutex in shared mode(instead in exclusive mode)

通常使用std::shared_timed_mutex的模式是让'reader'线程以共享方式获取,'writer'线程以独占方式获取。以这种方式,读取和写入不能同时发生,因此程序没有 data-race/undefined 行为。

我想知道如果我改变线程之间的模式是否存在任何问题,即reader线程在获取锁后读取共享变量独占模式并且写线程在共享模式.

获取互斥后写入共享变量
#include <iostream>
#include <thread>
#include <random>
#include <chrono>
#include <shared_mutex>

using namespace std::chrono_literals;

std::shared_timed_mutex lck;
int shared_array[5];

void writerFunc(int index);
void readerFunc();

//main thread
int main() {
  std::thread writer_threads[5];
  for(int i=0; i<5; ++i) {
    writer_threads[i] = std::thread(writerFunc,i);
  }

  while(true) {
    std::this_thread::sleep_for(5s);
    readerFunc();
  }


  for(int i=0; i<5; ++i) {
    writer_threads[i].join();
  }

}

//function executed in writer threads.
//Each writer thread will work on it's own index in the global shared array.
void writerFunc(int index) {
  std::random_device rd;
  std::mt19937 mt(rd());
  std::uniform_real_distribution<double> dist(1.0, 42.0);

  while(true) {
    {
      std::shared_lock<std::shared_timed_mutex> sl(lck);

      //Writing random number in shared variable.
      shared_array[index] += dist(mt);
    }

    std::this_thread::sleep_for(100ms);
  }
}

//function executed in reader thread(main).
void readerFunc() {
  std::lock_guard<std::shared_timed_mutex> sl(lck);
  for(int i=0; i<5 ; ++i) {
    std::cout<<"\nshared_array["<<i<<"]--> "<<shared_array[i];
  }
  std::cout<<"\n\n";
}

由于reader和writer线程不能同时并发访问变量,所以上面的程序不存在数据竞争。 Thread-sanitiser 也没有报告上述程序有任何问题。

我主要对 reader 线程读取的值有一点疑问。

是否由 C++ 标准保证,无论底层 CPU 架构如何,

a) 上面的程序没有任何UB?

b) reader线程只能看到writer线程写入的最新值?

******* 其他详细信息 ********

请注意,上面是一个简短的示例程序,我试图在其中复制我的主要项目设计的特定部分。那边规模大很多。例如那里的数组(不完全是数组,但非常相似)的大小约为 200 万。此外,数据结构不是简单的 int 而是自定义的可序列化结构。

所以想想这样的事情:

custom_serializable_struct shared_variable[2000000];

在我的主程序中,将有 'N' 个写入线程 和一个 单个 reader 线程。大多数时候,编写器线程会工作。由于 N 比 200 万小很多,因此我在编写器线程(我有从示例代码的设计中省略了这方面,因为我觉得它与我的要求无关)。

正如我在上面所说的,大多数时候,编写器线程都会工作。只是偶尔,reader 线程会工作。

Mainly, the program has following requirements:

  1. I've to minimize the wait time of writer threads spent on the mutex while the reader thread is working.
  2. I've to ensure that the reader thread, whenever it works, always gets the latest value written by the writer threads.

所以基本上这就是我的主程序中发生的事情:

N 个作者线程:

while (true) {
// 1. Acquire the shared_timed_mutex in shared mode.
// 2. Acquire the std::atomic_flag of the index, i, on which the thread has to work. This is required, as I mentioned, to prevent data race among writer threads.
// 3. Do changes in the custom_serializable_struct shared_variable[i]
}

1 reader 话题:

while(true) {
// 1. long sleep time.
// 2. Acquire the shared_timed_mutex in exclusive mode.
// 3. read the entire 2 million values. Please note that this read is not done 1 by 1 like in a for loop. It's more like memcpy of the entire memory.
}

unlock_sharedexplicitly synchronizes with subsequent lock calls on the same mutex。这将允许 reader 读取任何写入器写入的数据。同样,lock_shared 与之前对 unlock 的调用同步。因此可以在没有数据竞争的情况下向后使用 shared_mutex(注意:rand 不需要线程安全)。

但是……你应该吗?

互斥体的目的是确保数据完整性,不仅仅是在字节级别(即:数据竞争),而是在更高级别。您有 5 个线程写入 5 个不同的位置。但是……数据的含义是什么?这些数据是否彼此完全不同,或者数据的集合是否具有某种需要保留的意义?也就是说,如果一个线程写入一个值,如果另一个线程尚未写入其值,reader 是否会获得格式错误的信息?

如果这些数据值完全、根本上是分开的,那么就不需要互斥量(至少对于基本类型而言)。您真正在做的只是原子写入。作者可以写入 atomic<T>,reader 将读取这些内容。由于这些值都是不同的并且它们之间没有任何排序问题,因此您不需要阻止 any 线程写入。您需要做的就是确保个人级别的数据完整性 T。无锁原子将比任何基于互斥锁的解决方案快得多。

但是,如果数据具有某种完整性概念,如果线程组共同创建了 reader 线程应完整读取的单个值,那么您正在寻找的是 is a barrier,不是互斥量。该对象允许您查看一组执行代理是否已共同达到某个点。如果有,您可以安全地读取数据。一旦你读完它,就可以安全地释放代理再次给他们写信了。

但是为什么呢?

我想知道改变 reader 和作者在锁定方面的角色背后的动机是什么!你这样做是为了解决什么问题?

在之前的评论中,您提到您不希望作者之间发生争执。

查看代码,我还推断数组中每个 int 的更新都独立于其他但 reader 必须 看到他们一下子就好像他们有 ONE 意义(独占锁的原因)。你还没有提到这个 - 所以假设这是 不是意图

只有一个 reader,但有很多作家,即它看起来倒转为(某些?)刻板印象,即 reader 比作家多。这不应该是主要考虑因素。

应避免传达意想不到的含义和令人惊讶的代码。我同意@Nicol Bolas & 也建议另一种方法:

错误的工具 - 请改用 std::atomic

std::shared_timed_mutex 的反向使用对于未来的维护者(您自己?)来说是一个惊喜。另外,使用它是 reader 误导信息的来源,也是这个问题的原因。 我同意@Nicol Bolas 的观点,原子能解决这个问题:

std::atomic<int> shared_array[5];

void writerFunc(int index) {
   ///Other code
    while(true) {
        //Writing random number in shared variable.
        shared_array[index].fetch_add(dist(mt));

        std::this_thread::sleep_for(100ms);
    }
}

void readerFunc() {
    for (auto& item : shared_array) {
        std::cout << item;
    }
}

更好的抽象 - 使用 libguarded::shared_guarded

悲伤的根源似乎是您应用 std::shared_timed_mutex lck 的级别 - 它控制 整个数组 而您希望有更好的控制每个元素.

我强烈建议您考虑使用 BSD 2 条款“简化”许可下的 shared_guarded of the cs_libguarded

libguarded::shared_guarded<int> shared_array[5];  //Nailed it!

void writeFunc(int index) {
    //Other code
    while (true) {
        {
            auto handle = shared_array[index].lock();
            auto& item = *handle;
            item += dist(mt);
        }
        std::this_thread::sleep_for(100ms);
    }
}

void readerFunc() {
    for (auto& array_element : shared_array) {
        auto item = array_element.lock_shared();
        std::cout << *item;
    }
}

以上不仅确保了共享数据的正确使用,而且确保了 const 正确性,因为它不允许写入 lock_shared。这可以适用于任何数据类型,而不仅仅是 ints - std::atomic 具有的限制。正如@Solomon Slow 指出的那样,内存屏障可能会导致使用原始方法乱序执行的意外结果 - 此代码不存在该问题。 libguarded 还确保对共享数据的访问始终与 正确同步 - 不会意外使用共享数据。

仅供参考,shared_guarded 相当于为每个元素使用互斥量(如下所示),只是 更清洁、const 正确且万无一失.

std::shared_timed_mutex lck[5];  //Don't do it by hand, better use libguarded, as above
int shared_array[5];

我强烈建议优先考虑更清晰的实现而不是任意目标,比如不想有很多 mutexes。如果您不想争用,请消除共享而不是旨在减少互斥量。 问题是共享而不是互斥锁的存在

P.S.: 您将问题标记为 C++14,而 libguarded 需要 C++17。据我检查,libguarded::shared_guarded 应该与 std::shared_timed_mutex.

一起使用