C++ 单生产者多消费者程序偶尔崩溃

C++ Single Producer Multiple Consumer Program Crashes Sporadically

在下面的代码中,我正在创建一个 producer threadn consumer threads,它们从每个专用的 queue 中读取并打印到 stdout.这段代码有时会在语句 consumerQueues[id]->empty() 处崩溃。通过调试器,我看到 consumerQueues[id] 在崩溃时是 0x0。现在在 init() 函数中,我在创建 ith 工作人员 thread 之前创建了 ith 消费者 queue。我不确定为什么 consumerQueues[id] 会保持 0x0。请帮我看看这是怎么回事。

#include <thread>
#include <queue>
#include <memory>
#include <iostream>
#include <mutex>
#include <condition_variable>

class Test
{
private:
    void producer()
    {
        while(true)
        {
            std::string s = "abc";
            for(const auto& q : consumerQueues)
            {
                std::unique_lock<std::mutex> lock(mutex);
                q->push(s);
                condition_variable.notify_all();
            }
        }
    }

    void consumer(int id)
    {
        while (true)
        {
            std::string job;
            {
                std::unique_lock<std::mutex> lock(mutex);
                while(consumerQueues[id]->empty())
                {
                    condition_variable.wait(lock);
                }
                job = consumerQueues[id]->front();
                consumerQueues[id]->pop();
            }
            std::cout << "ID "<< id << " job " << job << std::endl;
        }
    }

    std::mutex mutex;
    std::condition_variable condition_variable;
    std::vector<std::thread> workers;
    std::vector<std::shared_ptr<std::queue<std::string>>> consumerQueues;
    std::thread producerThread;

public:

    Test(const unsigned n_threads):
    workers(std::vector<std::thread>(n_threads))
    {}

    Test(const Test &) = delete;
    Test(Test &&) = delete;

    Test & operator=(const Test &) = delete;
    Test & operator=(Test &&) = delete;

    void init()
    {
        for (unsigned i = 0; i < workers.size(); ++i)
        {
            consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
            workers[i] = std::thread(&Test::consumer, this, i);
        }
       producerThread  = std::thread(&Test::producer, this);
    }

    ~Test()
    {
        producerThread.join();
        for (unsigned i = 0; i < workers.size(); ++i)
        {
            if(workers[i].joinable())
            {
                workers[i].join();
            }
        }
    }
};


int main()
{
    Test t(1000);
    t.init();
    return 0;
}

您的初始化函数正在修改 std::vector 而没有互斥体。这会在线程一个接一个启动的同时修改向量。

要完成这项工作,您的初始化函数需要像这样:

 void init() {
     for (unsigned i = 0; i < workers.size(); ++i) {
            std::unique_lock<std::mutex> lock(mutex);
            consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
            workers[i] = std::thread(&Test::consumer, this, i);
     }
     producerThread  = std::thread(&Test::producer, this);
 }

发件人:http://www.cplusplus.com/reference/vector/vector/push_back/

Data races

The container is modified. If a reallocation happens, all contained elements are modified. Otherwise, no existing element is accessed, and concurrently accessing or modifying them is safe.

当它从 0 个元素开始到 1000 个时,经常会发生重新分配。因此您还可以保留向量的大小以确保不会发生重新分配:

 void init() {
     consumerQueues.reserve(workers.size());
     for (unsigned i = 0; i < workers.size(); ++i) {
            consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
            workers[i] = std::thread(&Test::consumer, this, i);
     }
     producerThread  = std::thread(&Test::producer, this);
 }