关于 std::condition_variables 的两个问题
Two questions on std::condition_variables
我一直在努力弄清楚std::condition_variables,但我对wait()
以及是否使用notify_all
或notify_one
感到特别困惑。
首先,我写了一些代码并附在下面。这是一个简短的解释:Collection
是一个 class,它持有一堆 Counter
对象。这些 Counter
对象有一个 Counter::increment()
方法,需要在所有对象上一遍又一遍地调用该方法。为了加快速度,Collection
还维护了一个线程池来分配工作,并使用其 Collection::increment_all()
方法发送所有工作。
这些线程不需要相互通信,并且 Counter
对象通常比线程多很多。如果一个线程处理的线程比其他线程多 Counter
s 也没关系,只要所有工作都完成即可。向队列中添加工作很容易,只需要在 "main" 线程中完成。据我所知,唯一可能发生的坏事是,如果允许在正在完成的工作中的计数器上调用其他方法(例如 Collection::printCounts
)。
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
class Counter{
private:
int m_count;
public:
Counter() : m_count(0) {}
void increment() {
m_count ++;
}
int getCount() const { return m_count; }
};
class Collection{
public:
Collection(unsigned num_threads, unsigned num_counters)
: m_shutdown(false)
{
// start workers
for(size_t i = 0; i < num_threads; ++i){
m_threads.push_back(std::thread(&Collection::work, this));
}
// intsntiate counters
for(size_t j = 0; j < num_counters; ++j){
m_counters.emplace_back();
}
}
~Collection()
{
m_shutdown = true;
for(auto& t : m_threads){
if(t.joinable()){
t.join();
}
}
}
void printCounts() {
// wait for work to be done
std::unique_lock<std::mutex> lk(m_mtx);
m_work_complete.wait(lk); // q2: do I need a while lop?
// print all current counters
for(const auto& cntr : m_counters){
std::cout << cntr.getCount() << ", ";
}
std::cout << "\n";
}
void increment_all()
{
std::unique_lock<std::mutex> lock(m_mtx);
m_work_complete.wait(lock);
for(size_t i = 0; i < m_counters.size(); ++i){
m_which_counters_have_work.push(i);
}
}
private:
void work()
{
while(!m_shutdown){
bool action = false;
unsigned which_counter;
{
std::unique_lock<std::mutex> lock(m_mtx);
if(m_which_counters_have_work.size()){
which_counter = m_which_counters_have_work.front();
m_which_counters_have_work.pop();
action = true;
}else{
m_work_complete.notify_one(); // q1: notify_all
}
}
if(action){
m_counters[which_counter].increment();
}
}
}
std::vector<Counter> m_counters;
std::vector<std::thread> m_threads;
std::condition_variable m_work_complete;
std::mutex m_mtx;
std::queue<unsigned> m_which_counters_have_work;
bool m_shutdown;
};
int main() {
int num_threads = std::thread::hardware_concurrency()-1;
int num_counters = 10;
Collection myCollection(num_threads, num_counters);
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
return 0;
}
我在 Ubuntu 18.04 上用 g++ -std=c++17 -pthread thread_pool.cpp -o tp && ./tp
编译了这个我认为代码实现了所有这些目标,但仍然存在一些问题:
我正在使用 m_work_complete.wait(lk)
来确保在我开始打印所有新计数之前完成工作。 为什么我有时会看到它写在 while
循环中,或者带有第二个参数作为 lambda 谓词函数? These docs 提到虚假唤醒。如果发生虚假唤醒,是否意味着 printCounts
可能会过早打印?如果是这样,我不想要那样。在我开始使用应该存在的数字之前,我只想确保工作队列是空的。
我正在使用 m_work_complete.notify_all
而不是 m_work_complete.notify_one
。我读过 this thread,但我认为这并不重要——只有主线程会因此被阻塞。 使用 notify_one
是否更快,这样其他线程就不必担心了?
很简单:使用 notify()
when;
- 没有理由不止一个线程需要了解这一事件。 (例如,使用
notify()
宣布某个项目的可用性,工作线程将 "consume," 从而使该项目对其他工作人员不可用)
*AND*
- 没有错误的线程可以被唤醒。 (例如,如果所有线程都
wait()
在相同函数的同一行中,您可能是安全的。)
在所有其他情况下使用 notify_all()
。
std::condition_variable
并不是一个真正的条件变量,它更像是一个达到某个条件的同步工具。该条件由程序员决定,并且在每次 condition_variable
唤醒后仍应检查它,因为它可能会在尚未达到所需条件时虚假或“过早”唤醒.
在 POSIX 系统上,condition_variable::wait()
委托给 pthread_cond_wait
, which is susceptible to spurious wake-up (see "Condition Wait Semantics" in the Rationale section). On Linux, pthread_cond_wait
is in turn implemented via a futex
,这又容易受到虚假唤醒的影响。
所以是的,您仍然需要一个标志(由相同的互斥锁保护)或其他一些方法来检查工作是否真正完成。一种方便的方法是将检查包装在谓词中并将其传递给 wait()
函数,该函数将为您循环直到满足谓词。
notify_all
取消阻塞等待条件变量的所有线程; notify_one
只解锁一个(或至少一个,准确地说)。如果有多个等待线程,并且它们是等价的,即任何一个都可以完全处理条件,并且如果条件足以让一个线程继续(如将工作单元提交到线程池),则 notify_one
会更有效率,因为它不会不必要地解除其他线程的阻塞,因为它们只会注意到没有工作要做并返回等待。如果你只有一个服务员,那么 notify_one
和 notify_all
之间就没有区别了。
我一直在努力弄清楚std::condition_variables,但我对wait()
以及是否使用notify_all
或notify_one
感到特别困惑。
首先,我写了一些代码并附在下面。这是一个简短的解释:Collection
是一个 class,它持有一堆 Counter
对象。这些 Counter
对象有一个 Counter::increment()
方法,需要在所有对象上一遍又一遍地调用该方法。为了加快速度,Collection
还维护了一个线程池来分配工作,并使用其 Collection::increment_all()
方法发送所有工作。
这些线程不需要相互通信,并且 Counter
对象通常比线程多很多。如果一个线程处理的线程比其他线程多 Counter
s 也没关系,只要所有工作都完成即可。向队列中添加工作很容易,只需要在 "main" 线程中完成。据我所知,唯一可能发生的坏事是,如果允许在正在完成的工作中的计数器上调用其他方法(例如 Collection::printCounts
)。
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
class Counter{
private:
int m_count;
public:
Counter() : m_count(0) {}
void increment() {
m_count ++;
}
int getCount() const { return m_count; }
};
class Collection{
public:
Collection(unsigned num_threads, unsigned num_counters)
: m_shutdown(false)
{
// start workers
for(size_t i = 0; i < num_threads; ++i){
m_threads.push_back(std::thread(&Collection::work, this));
}
// intsntiate counters
for(size_t j = 0; j < num_counters; ++j){
m_counters.emplace_back();
}
}
~Collection()
{
m_shutdown = true;
for(auto& t : m_threads){
if(t.joinable()){
t.join();
}
}
}
void printCounts() {
// wait for work to be done
std::unique_lock<std::mutex> lk(m_mtx);
m_work_complete.wait(lk); // q2: do I need a while lop?
// print all current counters
for(const auto& cntr : m_counters){
std::cout << cntr.getCount() << ", ";
}
std::cout << "\n";
}
void increment_all()
{
std::unique_lock<std::mutex> lock(m_mtx);
m_work_complete.wait(lock);
for(size_t i = 0; i < m_counters.size(); ++i){
m_which_counters_have_work.push(i);
}
}
private:
void work()
{
while(!m_shutdown){
bool action = false;
unsigned which_counter;
{
std::unique_lock<std::mutex> lock(m_mtx);
if(m_which_counters_have_work.size()){
which_counter = m_which_counters_have_work.front();
m_which_counters_have_work.pop();
action = true;
}else{
m_work_complete.notify_one(); // q1: notify_all
}
}
if(action){
m_counters[which_counter].increment();
}
}
}
std::vector<Counter> m_counters;
std::vector<std::thread> m_threads;
std::condition_variable m_work_complete;
std::mutex m_mtx;
std::queue<unsigned> m_which_counters_have_work;
bool m_shutdown;
};
int main() {
int num_threads = std::thread::hardware_concurrency()-1;
int num_counters = 10;
Collection myCollection(num_threads, num_counters);
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
return 0;
}
我在 Ubuntu 18.04 上用 g++ -std=c++17 -pthread thread_pool.cpp -o tp && ./tp
编译了这个我认为代码实现了所有这些目标,但仍然存在一些问题:
我正在使用
m_work_complete.wait(lk)
来确保在我开始打印所有新计数之前完成工作。 为什么我有时会看到它写在while
循环中,或者带有第二个参数作为 lambda 谓词函数? These docs 提到虚假唤醒。如果发生虚假唤醒,是否意味着printCounts
可能会过早打印?如果是这样,我不想要那样。在我开始使用应该存在的数字之前,我只想确保工作队列是空的。我正在使用
m_work_complete.notify_all
而不是m_work_complete.notify_one
。我读过 this thread,但我认为这并不重要——只有主线程会因此被阻塞。 使用notify_one
是否更快,这样其他线程就不必担心了?
很简单:使用 notify()
when;
- 没有理由不止一个线程需要了解这一事件。 (例如,使用
notify()
宣布某个项目的可用性,工作线程将 "consume," 从而使该项目对其他工作人员不可用)
*AND* - 没有错误的线程可以被唤醒。 (例如,如果所有线程都
wait()
在相同函数的同一行中,您可能是安全的。)
在所有其他情况下使用 notify_all()
。
std::condition_variable
并不是一个真正的条件变量,它更像是一个达到某个条件的同步工具。该条件由程序员决定,并且在每次condition_variable
唤醒后仍应检查它,因为它可能会在尚未达到所需条件时虚假或“过早”唤醒.在 POSIX 系统上,
condition_variable::wait()
委托给pthread_cond_wait
, which is susceptible to spurious wake-up (see "Condition Wait Semantics" in the Rationale section). On Linux,pthread_cond_wait
is in turn implemented via afutex
,这又容易受到虚假唤醒的影响。所以是的,您仍然需要一个标志(由相同的互斥锁保护)或其他一些方法来检查工作是否真正完成。一种方便的方法是将检查包装在谓词中并将其传递给
wait()
函数,该函数将为您循环直到满足谓词。notify_all
取消阻塞等待条件变量的所有线程;notify_one
只解锁一个(或至少一个,准确地说)。如果有多个等待线程,并且它们是等价的,即任何一个都可以完全处理条件,并且如果条件足以让一个线程继续(如将工作单元提交到线程池),则notify_one
会更有效率,因为它不会不必要地解除其他线程的阻塞,因为它们只会注意到没有工作要做并返回等待。如果你只有一个服务员,那么notify_one
和notify_all
之间就没有区别了。