条件变量基本示例

Condition variable basic example

我正在学习 C++11 中的条件变量,并根据示例代码编写了这个程序。

目标是在向量中累积由生产者生成并由消费者推入向量的前十个自然整数。但是它不起作用,因为例如在某些运行中,向量仅包含 1、7 和 10。

#include <mutex>
#include <condition_variable>
#include<vector>
#include <iostream>
#include <cstdio>

std::mutex mut;
#define MAX     10
int counter;
bool isIncremented = false;
std::vector<int> vec;
std::condition_variable condvar;

void producer() {
    while (counter < MAX) {
        std::lock_guard<std::mutex> lg(mut);
        ++counter;
        isIncremented = true;
        condvar.notify_one();
    }
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> ul(mut);
        condvar.wait(ul, [] { return isIncremented; });
        vec.push_back(counter);
        isIncremented = false;
        if (counter >= MAX) {
            break;
        }
    }
}

int main(int argc, char *argv[]) {
    std::thread t1(consumer);
    std::thread t2(producer);
    t2.join();
    t1.join();

    for (auto i : vec) {
        std::cout << i << ", ";
    }
    std::cout << std::endl;
    // Expected output: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
    // Example of actual output: 1, 7, 10,

    std::cout << "Press enter to quit";
    getchar();
    return 0;
}

问题是你只记得你的制作人制作的最后一个数字。你的生产者永远不会等到消费者消费完它生产的东西。如果您的生产者线程在消费者线程到达 运行 之前对其循环进行了多次迭代(这不太可能,因为循环不会做太多事情),消费者将只会看到生产者的最后一个数字产生并只将那个推入向量中......

要解决这个问题,要么使用第二个条件变量让生产者等待某人拿起它产生的最后一个结果,要么使用可以在生产者和消费者之间存储多个结果的东西,或者组合其中……

注意:通知条件变量不是阻塞调用。如果是这样,它就必须要求您交出互斥锁,以便它可以在内部释放它,否则您将陷入僵局。 notify_one() 只会唤醒等待条件变量和 return 的线程之一。被唤醒的线程阻塞的等待调用将在它 returns 之前重新获取互斥锁。在您的情况下,消费者线程被唤醒然后无法重新获取互斥锁并立即再次阻塞的可能性不大,因为您的生产者线程在调用 notify_one() 时仍持有互斥锁。因此,作为一般的经验法则,如果您在调用通知之前持有与条件变量相关联的互斥量,您希望释放它...

旁注,显然您在生产者中使用了 lock_guard<>,但在消费者中使用了 unique_lock。在消费者中,unique_lock似乎也没有专门保护共享资源。

下面是修改后的代码,在生产者和消费者中都使用 unique_lock,防止共享资源 counter

代码在生产者中添加了一个休眠,以便消费者可以收到计数器更改的通知。

输出似乎符合预期。

#include <mutex>
#include <condition_variable>
#include<vector>
#include <iostream>
#include <cstdio>
#include <thread>
#include <chrono>

std::mutex mut;
#define MAX   10
int counter = 0;
bool isIncremented = false;
std::vector<int> vec;
std::condition_variable condvar;

void producer() {
    while (counter < MAX) {
        std::unique_lock<std::mutex> lg(mut);
        ++counter;
        isIncremented = true;
        lg.unlock();

        condvar.notify_one();
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> ul(mut);
            condvar.wait(ul, [] { return isIncremented; });
        vec.push_back(counter);  
        isIncremented = false;
        if (counter >= MAX) {
            break;
        }
        ul.unlock();
    }
}

int main(int argc, char *argv[]) {
    std::thread t1(consumer);
    std::thread t2(producer);

    t2.join();
    t1.join();

    for (auto i : vec) {
        std::cout << i << ", ";
    }
    std::cout << std::endl;
    return 0;
}

使用来自答案的@MichaelKenzel 建议,这里是一个工作示例。 std::queue用于在生产者和消费者之间存储多个结果。

#include<mutex>
#include<condition_variable>
#include<vector>
#include<iostream>
#include<cstdio>
#include<thread>
#include<queue>

std::mutex mut;
#define MAX     10
int counter;
std::queue<int> data_queue;
std::vector<int> vec;
std::condition_variable condvar;

void producer()
{
    while (counter < MAX)
    {
        ++counter;
        std::lock_guard<std::mutex> lg(mut);
        data_queue.push(counter);
        condvar.notify_one();
    }
}

void consumer()
{
    while (true)
    {
        std::unique_lock<std::mutex> ul(mut);
        condvar.wait(ul, [] { return !data_queue.empty(); });

        int data = data_queue.front();
        data_queue.pop();
        ul.unlock();

        vec.push_back(data);
        if (data >= MAX)
        {
            break;
        }
    }
}

int main(int argc, char *argv[])
{
    std::thread t1(consumer);
    std::thread t2(producer);
    t2.join();
    t1.join();

    for (auto i : vec)
    {
        std::cout << i << ", ";
    }
    std::cout << std::endl;

    return 0;
}