关于 std::condition_variables 的两个问题

Two questions on std::condition_variables

我一直在努力弄清楚std::condition_variables,但我对wait()以及是否使用notify_allnotify_one感到特别困惑。

首先,我写了一些代码并附在下面。这是一个简短的解释:Collection 是一个 class,它持有一堆 Counter 对象。这些 Counter 对象有一个 Counter::increment() 方法,需要在所有对象上一遍又一遍地调用该方法。为了加快速度,Collection 还维护了一个线程池来分配工作,并使用其 Collection::increment_all() 方法发送所有工作。

这些线程不需要相互通信,并且 Counter 对象通常比线程多很多。如果一个线程处理的线程比其他线程多 Counters 也没关系,只要所有工作都完成即可。向队列中添加工作很容易,只需要在 "main" 线程中完成。据我所知,唯一可能发生的坏事是,如果允许在正在完成的工作中的计数器上调用其他方法(例如 Collection::printCounts)。

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>


class Counter{
private:
    int m_count;
public:
    Counter() : m_count(0) {}
    void increment() { 
        m_count ++; 
    }
    int getCount() const { return m_count; }
};


class Collection{
public:
    Collection(unsigned num_threads, unsigned num_counters) 
    : m_shutdown(false)
    {
        // start workers
        for(size_t i = 0; i < num_threads; ++i){
            m_threads.push_back(std::thread(&Collection::work, this)); 
        }

        // intsntiate counters
        for(size_t j = 0; j < num_counters; ++j){
            m_counters.emplace_back();
        }
    }

    ~Collection() 
    { 
        m_shutdown = true;
        for(auto& t : m_threads){
            if(t.joinable()){
                t.join();
            }
        }
    }

    void printCounts() {

        // wait for work to be done
        std::unique_lock<std::mutex> lk(m_mtx);
        m_work_complete.wait(lk); // q2: do I need a while lop?  

        // print all current counters
        for(const auto& cntr : m_counters){
            std::cout << cntr.getCount() << ", ";
        }
        std::cout << "\n";
    }

    void increment_all() 
    {
        std::unique_lock<std::mutex> lock(m_mtx);
        m_work_complete.wait(lock);
        for(size_t i = 0; i < m_counters.size(); ++i){
            m_which_counters_have_work.push(i);
        }

    }


private:    
    void work()
    {
        while(!m_shutdown){

            bool action = false;
            unsigned which_counter;
            {
                std::unique_lock<std::mutex> lock(m_mtx);
                if(m_which_counters_have_work.size()){
                    which_counter = m_which_counters_have_work.front();
                    m_which_counters_have_work.pop();
                    action = true;
                }else{
                    m_work_complete.notify_one(); // q1: notify_all
                }
            }

            if(action){
                m_counters[which_counter].increment();
            }
        }   
    }



    std::vector<Counter> m_counters;
    std::vector<std::thread> m_threads;
    std::condition_variable m_work_complete;
    std::mutex m_mtx;
    std::queue<unsigned> m_which_counters_have_work;
    bool m_shutdown;

};

int main() {

    int num_threads = std::thread::hardware_concurrency()-1;
    int num_counters = 10;
    Collection myCollection(num_threads, num_counters);

    myCollection.printCounts();
    myCollection.increment_all();
    myCollection.printCounts();

    myCollection.increment_all();
    myCollection.printCounts();

    return 0;
}

我在 Ubuntu 18.04 上用 g++ -std=c++17 -pthread thread_pool.cpp -o tp && ./tp 编译了这个我认为代码实现了所有这些目标,但仍然存在一些问题:

  1. 我正在使用 m_work_complete.wait(lk) 来确保在我开始打印所有新计数之前完成工作。 为什么我有时会看到它写在 while 循环中,或者带有第二个参数作为 lambda 谓词函数? These docs 提到虚假唤醒。如果发生虚假唤醒,是否意味着 printCounts 可能会过早打印?如果是这样,我不想要那样。在我开始使用应该存在的数字之前,我只想确保工作队列是空的。

  2. 我正在使用 m_work_complete.notify_all 而不是 m_work_complete.notify_one。我读过 this thread,但我认为这并不重要——只有主线程会因此被阻塞。 使用 notify_one 是否更快,这样其他线程就不必担心了?

很简单:使用 notify() when;

  1. 没有理由不止一个线程需要了解这一事件。 (例如,使用 notify() 宣布某个项目的可用性,工作线程将 "consume," 从而使该项目对其他工作人员不可用)
    *AND*
  2. 没有错误的线程可以被唤醒。 (例如,如果所有线程都wait()在相同函数的同一行中,您可能是安全的。)

在所有其他情况下使用 notify_all()

  1. std::condition_variable并不是一个真正的条件变量,它更像是一个达到某个条件的同步工具。该条件由程序员决定,并且在每次 condition_variable 唤醒后仍应检查它,因为它可能会在尚未达到所需条件时虚假或“过早”唤醒.

    在 POSIX 系统上,condition_variable::wait() 委托给 pthread_cond_wait, which is susceptible to spurious wake-up (see "Condition Wait Semantics" in the Rationale section). On Linux, pthread_cond_wait is in turn implemented via a futex,这又容易受到虚假唤醒的影响。

    所以是的,您仍然需要一个标志(由相同的互斥锁保护)或其他一些方法来检查工作是否真正完成。一种方便的方法是将检查包装在谓词中并将其传递给 wait() 函数,该函数将为您循环直到满足谓词。

  2. notify_all 取消阻塞等待条件变量的所有线程; notify_one 只解锁一个(或至少一个,准确地说)。如果有多个等待线程,并且它们是等价的,即任何一个都可以完全处理条件,并且如果条件足以让一个线程继续(如将工作单元提交到线程池),则 notify_one 会更有效率,因为它不会不必要地解除其他线程的阻塞,因为它们只会注意到没有工作要做并返回等待。如果你只有一个服务员,那么 notify_onenotify_all 之间就没有区别了。