C++11:Publisher/Consumer 模式不会完成,除非发布者休眠

C++11: Publisher/Consumer pattern does not finish unless publisher sleeps

在 C++ 中,我尝试使用 condition_variable 获取 publisher/consumer 模式的句柄。这是我在网上看到的大致模板:

#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <chrono>

using namespace std;

mutex m;
queue<string> que;
condition_variable cond;

void write(string &&msg) {
    unique_lock<mutex> locker(m);
    que.push(msg);
    locker.unlock();
    cond.notify_one();
    this_thread::sleep_for(chrono::milliseconds(1));
}

void read() {
    while (true) {
        unique_lock<mutex> locker(m);
        cond.wait(locker);
        if (!que.empty()) {
            cout << que.front() << endl;
            que.pop();
        }
        locker.unlock();
    }
}

void publisher(string &msg) {
    for (int i = 0; i < 100; ++i)
        write("Publisher: " + msg + ", " + to_string(i));
}

int main() {

    string msg = "Hello";
    thread pub_thread(publisher, std::ref(msg));

    /* The main thread will print the publisher's messages to cout. */
    read();

    /* Make the main thread wait for the publisher to finish. */
    pub_thread.join(); //
    return 0;
}

我不明白的是发布者线程上的 sleep_for 调用。我知道这只是为了模拟 "real life" 场景,在该场景中发布者不会那么快地吐出消息。但是,奇怪的是,如果我注释掉该行,代码不会 运行 完成。这是为什么?

此外,我尝试将sleep_for时间设置为0,效果相同。看来出版根本上需要睡觉,但我不明白为什么。更具体地说,代码应该打印出 100 条消息。如果我让代码休眠 1 毫秒,则会打印所有 100 条消息。如果我不这样做,那么在代码冻结之前我只能看到大约 10 条消息。似乎发生了死锁。

如果有更好的模式可以避免让发布者休眠,则加分...

我知道在实践中你需要有一个停止主线程的策略,就像毒丸一样。我有意省略了这一点以专注于当前的讨论。

编辑

嗯。如果我放入一个块来处理虚假唤醒,那么问题就解决了。但这仍然不能解释原始代码失败的原因。

这是一个改进的读取函数:

void read() {
    while (true) {
        unique_lock<mutex> locker(m);
        cond.wait(locker, [&](){ return !que.empty(); });
        cout << que.front() << endl;
        que.pop();
        locker.unlock();
    }
}

您需要处理发布者发布速度快于消费者消费速度的情况。

发生这种情况时,消费者将错过 condition_variable 个触发器。请记住,notify 调用不会累积。

将消费者更改为在唤醒后消费所有可用消息:

if (!que.empty())while (!que.empty())

像这样:

void read() {
    while (true) {
        unique_lock<mutex> locker(m);
        cond.wait(locker);
        while (!que.empty()) {
            cout << que.front() << endl;
            que.pop();
        }
        locker.unlock();
    }
}

你有两个问题:一个是发布者没有义务屈服于 reader - 或者暂停足够长的时间让 reader 成功锁定互斥体 - 除非你成功了。

第二个是你的reader无论如何都不正确:

void read() {
    while (true) {
        unique_lock<mutex> locker(m);
        cond.wait(locker);
        if (!que.empty()) {
            cout << que.front() << endl;
            que.pop();
        }
    }
}

这假设您将每个推入队列的元素得到一个 "wakeup event",因为它每次唤醒只消耗一个元素。但这不是条件变量的工作方式。

这个序列完全有可能发生:

  1. 出版商添加项目 1
    • 出版商信号 condvar
  2. 出版商添加项目 2
    • 出版商信号 condvar
  3. reader 从 condvar 等待中唤醒
    • reader 消耗物品 1

在这种情况下,发布者将添加 100 个项目,并发出 100 次信号,但 reader 只会唤醒 99 次,因此最多消耗 99 个项目。

正确的代码应该是这样的:

void read() {
    unique_lock<mutex> locker(m);
    while (true) {
        // don't wait if we don't have to
        while (que.empty()) {
            cond.wait(locker);
        }
        // consume everything we can
        while (!que.empty()) {
            cout << que.front() << endl;
            que.pop();
        }
    }
}

使用谓词可以达到大致相同的效果(为了清楚起见,我只是明确地写出了所有逻辑)——第二个 while 不在您的编辑中,但是循环并跳过第一个 while 是获得相同行为的一种稍微昂贵的方法。

此外,无需在每次迭代时都对互斥量进行扰动 - 条件变量已根据需要(解除)锁定它。