std::queue producer/consumer 的最小互斥量

Minimal mutexes for std::queue producer/consumer

我有两个线程在 std::queue 的生产者端和消费者端工作。队列并不经常满,所以我想避免消费者抓住保护队列变异的互斥体。

是否可以在互斥量之外调用 empty() 然后仅在队列中有内容时才获取互斥量?

例如:

struct MyData{
   int a;
   int b;
};

class SpeedyAccess{
public:
   void AddDataFromThread1(MyData data){
      const std::lock_guard<std::mutex> queueMutexLock(queueAccess);
      workQueue.push(data);
   }

   void CheckFromThread2(){
      if(!workQueue.empty()) // Un-protected access...is this dangerous?
      {
         queueAccess.lock();
         MyData data = workQueue.front();
         workQueue.pop();
         queueAccess.unlock();

         ExpensiveComputation(data);
       }

   }

private:
   void ExpensiveComputation(MyData& data);

   std::queue<MyData> workQueue;
   std::mutex queueAccess;
}

线程 2 执行检查并且不是特别时间关键,但会被调用很多(500/秒?)。线程 1 的时间非常关键,很多东西需要 运行 在那里,但调用频率不高(最多 20 次/秒)。

如果我在 empty() 周围添加一个互斥保护,如果线程 2 到来时队列为空,它不会长时间持有互斥,因此可能不会大受欢迎。然而,由于它被如此频繁地调用,它可能偶尔会在同一时间发生某些东西试图放在后面......这会导致线程 1 中的大量等待吗?

正如上面评论中所写,您应该只在锁定的情况下调用 empty()

但我相信有更好的方法。
您可以使用 std::condition_variable together with a std::mutex 来实现对队列访问的同步,而不需要比您必须的更多地锁定互斥量。

但是 - 使用 std::condition_variable 时,您必须意识到它会受到 虚假唤醒 的影响。您可以在这里阅读:Spurious wakeup - Wikipedia.
您可以在此处查看一些代码示例: Condition variable examples.

使用 std::condition_variable 的正确方法如下所示(带有一些注释)。 这只是展示原理的最小示例。

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <iostream>

using MyData = int;

std::mutex mtx;
std::condition_variable cond_var;
std::queue<MyData> q;

void producer()
{
    MyData produced_val = 0;
    while (true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));   // simulate some pause between productions
        ++produced_val;
        std::cout << "produced: " << produced_val << std::endl;
        {
            // Access the Q under the lock:
            std::unique_lock<std::mutex> lck(mtx);
            q.push(produced_val);
            cond_var.notify_all();  // It's not a must to nofity under the lock but it might be more efficient (see @DavidSchwartz's comment below).
        }
    }
}

void consumer()
{
    while (true)
    {
        MyData consumed_val;
        {
            // Access the Q under the lock:
            std::unique_lock<std::mutex> lck(mtx);
            // NOTE: The following call will lock the mutex only when the the condition_varible will cause wakeup
            //       (due to `notify` or spurious wakeup).
            //       Then it will check if the Q is empty.
            //       If empty it will release the lock and continue to wait. 
            //       If not empty, the lock will be kept until out of scope.
            //       See the documentation for std::condition_variable.
            cond_var.wait(lck, []() { return !q.empty(); }); // will loop internally to handle spurious wakeups
            consumed_val = q.front();
            q.pop();
        }
        std::cout << "consumed: " << consumed_val << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(200));    // simulate some calculation
    }
}

int main()
{
    std::thread p(producer);
    std::thread c(consumer);
    while(true) {}
    p.join(); c.join(); // will never happen in our case but to remind us what is needed.
    return 0;
}

一些注意事项:

  1. 在您的实际代码中,none 个线程应该永远 运行。你应该有一些机制来通知他们优雅地退出。
  2. 全局变量(mtxq等)最好是某些上下文class的成员,或者传递给producer()consumer()作为参数。
  3. 为简单起见,此示例假设生产者的生产率始终低于消费者的生产率。在您的真实代码中,您可以使其更通用,方法是让消费者在每次发出 condition_variable 信号时提取 Q 中的所有元素。
  4. 您可以“玩”生产者和消费者的 sleep_for 次来测试各种时序情况。