C++ 单生产者多消费者程序偶尔崩溃
C++ Single Producer Multiple Consumer Program Crashes Sporadically
在下面的代码中,我正在创建一个 producer thread
和 n
consumer threads
,它们从每个专用的 queue
中读取并打印到 stdout
.这段代码有时会在语句 consumerQueues[id]->empty()
处崩溃。通过调试器,我看到 consumerQueues[id]
在崩溃时是 0x0
。现在在 init()
函数中,我在创建 ith
工作人员 thread
之前创建了 ith
消费者 queue
。我不确定为什么 consumerQueues[id]
会保持 0x0
。请帮我看看这是怎么回事。
#include <thread>
#include <queue>
#include <memory>
#include <iostream>
#include <mutex>
#include <condition_variable>
class Test
{
private:
void producer()
{
while(true)
{
std::string s = "abc";
for(const auto& q : consumerQueues)
{
std::unique_lock<std::mutex> lock(mutex);
q->push(s);
condition_variable.notify_all();
}
}
}
void consumer(int id)
{
while (true)
{
std::string job;
{
std::unique_lock<std::mutex> lock(mutex);
while(consumerQueues[id]->empty())
{
condition_variable.wait(lock);
}
job = consumerQueues[id]->front();
consumerQueues[id]->pop();
}
std::cout << "ID "<< id << " job " << job << std::endl;
}
}
std::mutex mutex;
std::condition_variable condition_variable;
std::vector<std::thread> workers;
std::vector<std::shared_ptr<std::queue<std::string>>> consumerQueues;
std::thread producerThread;
public:
Test(const unsigned n_threads):
workers(std::vector<std::thread>(n_threads))
{}
Test(const Test &) = delete;
Test(Test &&) = delete;
Test & operator=(const Test &) = delete;
Test & operator=(Test &&) = delete;
void init()
{
for (unsigned i = 0; i < workers.size(); ++i)
{
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}
~Test()
{
producerThread.join();
for (unsigned i = 0; i < workers.size(); ++i)
{
if(workers[i].joinable())
{
workers[i].join();
}
}
}
};
int main()
{
Test t(1000);
t.init();
return 0;
}
您的初始化函数正在修改 std::vector 而没有互斥体。这会在线程一个接一个启动的同时修改向量。
要完成这项工作,您的初始化函数需要像这样:
void init() {
for (unsigned i = 0; i < workers.size(); ++i) {
std::unique_lock<std::mutex> lock(mutex);
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}
发件人:http://www.cplusplus.com/reference/vector/vector/push_back/
Data races
The container is modified. If a reallocation happens, all contained
elements are modified. Otherwise, no existing element is accessed, and
concurrently accessing or modifying them is safe.
当它从 0 个元素开始到 1000 个时,经常会发生重新分配。因此您还可以保留向量的大小以确保不会发生重新分配:
void init() {
consumerQueues.reserve(workers.size());
for (unsigned i = 0; i < workers.size(); ++i) {
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}
在下面的代码中,我正在创建一个 producer thread
和 n
consumer threads
,它们从每个专用的 queue
中读取并打印到 stdout
.这段代码有时会在语句 consumerQueues[id]->empty()
处崩溃。通过调试器,我看到 consumerQueues[id]
在崩溃时是 0x0
。现在在 init()
函数中,我在创建 ith
工作人员 thread
之前创建了 ith
消费者 queue
。我不确定为什么 consumerQueues[id]
会保持 0x0
。请帮我看看这是怎么回事。
#include <thread>
#include <queue>
#include <memory>
#include <iostream>
#include <mutex>
#include <condition_variable>
class Test
{
private:
void producer()
{
while(true)
{
std::string s = "abc";
for(const auto& q : consumerQueues)
{
std::unique_lock<std::mutex> lock(mutex);
q->push(s);
condition_variable.notify_all();
}
}
}
void consumer(int id)
{
while (true)
{
std::string job;
{
std::unique_lock<std::mutex> lock(mutex);
while(consumerQueues[id]->empty())
{
condition_variable.wait(lock);
}
job = consumerQueues[id]->front();
consumerQueues[id]->pop();
}
std::cout << "ID "<< id << " job " << job << std::endl;
}
}
std::mutex mutex;
std::condition_variable condition_variable;
std::vector<std::thread> workers;
std::vector<std::shared_ptr<std::queue<std::string>>> consumerQueues;
std::thread producerThread;
public:
Test(const unsigned n_threads):
workers(std::vector<std::thread>(n_threads))
{}
Test(const Test &) = delete;
Test(Test &&) = delete;
Test & operator=(const Test &) = delete;
Test & operator=(Test &&) = delete;
void init()
{
for (unsigned i = 0; i < workers.size(); ++i)
{
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}
~Test()
{
producerThread.join();
for (unsigned i = 0; i < workers.size(); ++i)
{
if(workers[i].joinable())
{
workers[i].join();
}
}
}
};
int main()
{
Test t(1000);
t.init();
return 0;
}
您的初始化函数正在修改 std::vector 而没有互斥体。这会在线程一个接一个启动的同时修改向量。
要完成这项工作,您的初始化函数需要像这样:
void init() {
for (unsigned i = 0; i < workers.size(); ++i) {
std::unique_lock<std::mutex> lock(mutex);
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}
发件人:http://www.cplusplus.com/reference/vector/vector/push_back/
Data races
The container is modified. If a reallocation happens, all contained elements are modified. Otherwise, no existing element is accessed, and concurrently accessing or modifying them is safe.
当它从 0 个元素开始到 1000 个时,经常会发生重新分配。因此您还可以保留向量的大小以确保不会发生重新分配:
void init() {
consumerQueues.reserve(workers.size());
for (unsigned i = 0; i < workers.size(); ++i) {
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}