C++ 中的多线程 Producer/Consumer
Multithreaded Producer/Consumer in C++
我正在研究多线程并编写了一个基础的 producer/consumer。下面写的 producer/consumer 我有两个问题。 1) 即使将消费者睡眠时间设置得比生产者睡眠时间短,生产者似乎仍然执行得更快。 2)在消费者中,我已经复制了生产者完成添加到队列但队列中仍有元素的情况下的代码。对更好的代码结构有什么建议吗?
#include <iostream>
#include <queue>
#include <mutex>
class App {
private:
std::queue<int> m_data;
bool m_bFinished;
std::mutex m_Mutex;
int m_ConsumerSleep;
int m_ProducerSleep;
int m_QueueSize;
public:
App(int &MaxQueue) :m_bFinished(false), m_ConsumerSleep(1), m_ProducerSleep(5), m_QueueSize(MaxQueue){}
void Producer() {
for (int i = 0; i < m_QueueSize; ++i) {
std::lock_guard<std::mutex> guard(m_Mutex);
m_data.push(i);
std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(m_ProducerSleep));
}
m_bFinished = true;
}
void Consumer() {
while (!m_bFinished) {
if (m_data.size() > 0) {
std::lock_guard<std::mutex> guard(m_Mutex);
std::cout << "Consumer Thread, queue element: " << m_data.front() << " size: " << m_data.size() << std::endl;
m_data.pop();
}
else {
std::cout << "No elements, skipping" << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(m_ConsumerSleep));
}
while (m_data.size() > 0) {
std::lock_guard<std::mutex> guard(m_Mutex);
std::cout << "Emptying remaining elements " << m_data.front() << std::endl;
m_data.pop();
std::this_thread::sleep_for(std::chrono::seconds(m_ConsumerSleep));
}
}
};
int main()
{
int QueueElements = 10;
App app(QueueElements);
std::thread consumer_thread(&App::Consumer, &app);
std::thread producer_thread(&App::Producer, &app);
producer_thread.join();
consumer_thread.join();
std::cout << "loop exited" << std::endl;
return 0;
}
首先,您应该使用条件变量而不是对消费者的延迟。这样,消费者线程只有在队列不为空、生产者通知时才会被唤醒。
也就是说,您的生产者调用更频繁的原因是生产者线程上的延迟。它在持有互斥量的同时执行,因此消费者在延迟结束之前永远不会执行。您应该在调用 sleep_for
之前释放互斥量:
for (int i = 0; i < m_QueueSize; ++i) {
/* Introduce a scope to release the mutex before sleeping*/
{
std::lock_guard<std::mutex> guard(m_Mutex);
m_data.push(i);
std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
} // Mutex is released here
std::this_thread::sleep_for(std::chrono::seconds(m_ProducerSleep));
}
你应该使用 condition_variable。不要对线程使用睡眠。
主要方案:
生产者将价值锁定并发出信号 condition_variable.
消费者在锁定条件变量的情况下等待并检查谓词以防止虚假唤醒。
我的版本:
#include <iostream>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <atomic>
class App {
private:
std::queue<int> m_data;
std::atomic_bool m_bFinished;
std::mutex m_Mutex;
std::condition_variable m_cv;
int m_QueueSize;
public:
App(int MaxQueue)
: m_bFinished(false)
, m_QueueSize(MaxQueue)
{}
void Producer()
{
for (int i = 0; i < m_QueueSize; ++i)
{
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_data.push(i);
}
m_cv.notify_one();
std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
}
m_bFinished = true;
}
void Consumer()
{
do
{
std::unique_lock<std::mutex> lock(m_Mutex);
while (m_data.empty())
{
m_cv.wait(lock, [&](){ return !m_data.empty(); }); // predicate an while loop - protection from spurious wakeups
}
while(!m_data.empty()) // consume all elements from queue
{
std::cout << "Consumer Thread, queue element: " << m_data.front() << " size: " << m_data.size() << std::endl;
m_data.pop();
}
} while(!m_bFinished);
}
};
int main()
{
int QueueElements = 10;
App app(QueueElements);
std::thread consumer_thread(&App::Consumer, &app);
std::thread producer_thread(&App::Producer, &app);
producer_thread.join();
consumer_thread.join();
std::cout << "loop exited" << std::endl;
return 0;
}
另请注意,当您处理并发线程时,最好使用 atomic 作为结束标志,因为理论上 m_bFinished
的值将存储在缓存行中,如果生产者线程中没有缓存失效,消费者线程看不到更改的值。 Atomics 有内存栅栏,这保证了该值将为其他线程更新。
您也可以在 memory_order 页面上查看。
我正在研究多线程并编写了一个基础的 producer/consumer。下面写的 producer/consumer 我有两个问题。 1) 即使将消费者睡眠时间设置得比生产者睡眠时间短,生产者似乎仍然执行得更快。 2)在消费者中,我已经复制了生产者完成添加到队列但队列中仍有元素的情况下的代码。对更好的代码结构有什么建议吗?
#include <iostream>
#include <queue>
#include <mutex>
class App {
private:
std::queue<int> m_data;
bool m_bFinished;
std::mutex m_Mutex;
int m_ConsumerSleep;
int m_ProducerSleep;
int m_QueueSize;
public:
App(int &MaxQueue) :m_bFinished(false), m_ConsumerSleep(1), m_ProducerSleep(5), m_QueueSize(MaxQueue){}
void Producer() {
for (int i = 0; i < m_QueueSize; ++i) {
std::lock_guard<std::mutex> guard(m_Mutex);
m_data.push(i);
std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(m_ProducerSleep));
}
m_bFinished = true;
}
void Consumer() {
while (!m_bFinished) {
if (m_data.size() > 0) {
std::lock_guard<std::mutex> guard(m_Mutex);
std::cout << "Consumer Thread, queue element: " << m_data.front() << " size: " << m_data.size() << std::endl;
m_data.pop();
}
else {
std::cout << "No elements, skipping" << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(m_ConsumerSleep));
}
while (m_data.size() > 0) {
std::lock_guard<std::mutex> guard(m_Mutex);
std::cout << "Emptying remaining elements " << m_data.front() << std::endl;
m_data.pop();
std::this_thread::sleep_for(std::chrono::seconds(m_ConsumerSleep));
}
}
};
int main()
{
int QueueElements = 10;
App app(QueueElements);
std::thread consumer_thread(&App::Consumer, &app);
std::thread producer_thread(&App::Producer, &app);
producer_thread.join();
consumer_thread.join();
std::cout << "loop exited" << std::endl;
return 0;
}
首先,您应该使用条件变量而不是对消费者的延迟。这样,消费者线程只有在队列不为空、生产者通知时才会被唤醒。
也就是说,您的生产者调用更频繁的原因是生产者线程上的延迟。它在持有互斥量的同时执行,因此消费者在延迟结束之前永远不会执行。您应该在调用 sleep_for
之前释放互斥量:
for (int i = 0; i < m_QueueSize; ++i) {
/* Introduce a scope to release the mutex before sleeping*/
{
std::lock_guard<std::mutex> guard(m_Mutex);
m_data.push(i);
std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
} // Mutex is released here
std::this_thread::sleep_for(std::chrono::seconds(m_ProducerSleep));
}
你应该使用 condition_variable。不要对线程使用睡眠。
主要方案: 生产者将价值锁定并发出信号 condition_variable.
消费者在锁定条件变量的情况下等待并检查谓词以防止虚假唤醒。
我的版本:
#include <iostream>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <atomic>
class App {
private:
std::queue<int> m_data;
std::atomic_bool m_bFinished;
std::mutex m_Mutex;
std::condition_variable m_cv;
int m_QueueSize;
public:
App(int MaxQueue)
: m_bFinished(false)
, m_QueueSize(MaxQueue)
{}
void Producer()
{
for (int i = 0; i < m_QueueSize; ++i)
{
{
std::unique_lock<std::mutex> lock(m_Mutex);
m_data.push(i);
}
m_cv.notify_one();
std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
}
m_bFinished = true;
}
void Consumer()
{
do
{
std::unique_lock<std::mutex> lock(m_Mutex);
while (m_data.empty())
{
m_cv.wait(lock, [&](){ return !m_data.empty(); }); // predicate an while loop - protection from spurious wakeups
}
while(!m_data.empty()) // consume all elements from queue
{
std::cout << "Consumer Thread, queue element: " << m_data.front() << " size: " << m_data.size() << std::endl;
m_data.pop();
}
} while(!m_bFinished);
}
};
int main()
{
int QueueElements = 10;
App app(QueueElements);
std::thread consumer_thread(&App::Consumer, &app);
std::thread producer_thread(&App::Producer, &app);
producer_thread.join();
consumer_thread.join();
std::cout << "loop exited" << std::endl;
return 0;
}
另请注意,当您处理并发线程时,最好使用 atomic 作为结束标志,因为理论上 m_bFinished
的值将存储在缓存行中,如果生产者线程中没有缓存失效,消费者线程看不到更改的值。 Atomics 有内存栅栏,这保证了该值将为其他线程更新。
您也可以在 memory_order 页面上查看。