特定的生产者-消费者场景

Particular producer-consumer scenario

我需要实现一个特定的生产者-消费者场景,其中 class 消费者创建两个线程来处理两个端口,每个端口都将一个值存储到相应的端口中,如果值在另一个端口上可用端口,调用 process 以使用两个端口上的值。这是我的尝试:

#include <thread>
#include <atomic>
#include <condition_variable>
#include <vector>
#include <iostream>
#include <mutex>

struct Port {
    int value;
    std::atomic<bool> free;
};

class Consumer {
private:
    Port port1;
    Port port2;
    std::mutex mutex;
    std::condition_variable port1_ready;
    std::condition_variable port2_ready;

    std::vector<std::thread> workers;
    std::atomic<bool> done;
    int count;

public:
    Consumer() : done(false), count(0) {
        port1.free = true;
        port1.value = 0;
        port2.free = true;
        port1.value = 0;

        workers.push_back(std::thread([this]{
            while (!done) {
                write1(rand());
            }
        }));

        workers.push_back(std::thread([this]{
            while (!done) {
                write2(rand());
            }
        }));
    }

    void write1(int value) {
        std::unique_lock lock(mutex);
        port1.value = value;
        port1.free = false;
        std::cout << "port1 stored " << value << std::endl;
        port1_ready.notify_one();
        if (port2.free) {
            port2_ready.wait(lock);
        }
        process("port1");
    }

    void write2(int value) {
        std::unique_lock lock(mutex);
        port2.value = value;
        port2.free = false;
        std::cout << "port2 stored " << value << std::endl;
        port2_ready.notify_one();
        if (port1.free) {
            port1_ready.wait(lock);
        }
        process("port2");
    }

    void process(std::string string) {
        port1.free = true;
        port2.free = true;
        std::cout << string << " consumed " << port1.value << " " << port2.value << std::endl;
        if (count++ == 20) done = true;
    }

    ~Consumer() {
        for (auto& w: workers) {
            w.join();
        }
    }
};

int main(int argc, char** argv) {

    Consumer c{};

    return 0;
}

这是输出:

port1 stored 41
port2 stored 41
port2 consumed 41 41
port2 stored 18467
port1 consumed 41 18467
port1 stored 18467
port2 consumed 18467 18467
port2 stored 6334
port1 consumed 18467 6334
port1 stored 6334
port2 consumed 6334 6334
port2 stored 26500
port1 consumed 6334 26500
port1 stored 26500
port2 consumed 26500 26500
port2 stored 19169
port1 consumed 26500 19169
port1 stored 19169
port2 consumed 19169 19169
port2 stored 15724
port1 consumed 19169 15724
port1 stored 15724
port2 consumed 15724 15724
port2 stored 11478
port1 consumed 15724 11478
port1 stored 11478
port2 consumed 11478 11478
port2 stored 29358
port1 consumed 11478 29358
port1 stored 29358
port2 consumed 29358 29358
port2 stored 26962
port1 consumed 29358 26962
port1 stored 26962
port2 consumed 26962 26962
port2 stored 24464
port1 consumed 26962 24464
port1 stored 24464
port2 consumed 24464 24464
port2 stored 5705
port1 consumed 24464 5705
port1 stored 5705
port2 consumed 5705 5705

有时会成功returns其他人会陷入僵局。

通过注意以下几点可以很容易地观察到您的逻辑错误:

void process(std::string string) {
        port1.free = true;
        port2.free = true;

这清楚地表明您的意图是将现在放置在两个 "port" 中的值视为 "processed"。也就是说,一旦将一个值放入两个 "port" 中,两个值都将变为 "processed",并且两个端口再次为 "free"。

但是,请注意日志的开头:

port1 stored 41
port2 stored 41
port2 consumed 41 41

到目前为止一切顺利,41 被放置在两个端口中,port2 进程结束了 "process"ing 它们。但紧接着:

port2 stored 18467
port1 consumed 41 18467

至此,rails 的事情已经结束了。 41 已经 "process"ed,显然不应该再次 "process"ed。

write1()write2() 的内容打印在两张纸上。用左手的食指追踪write1()线程的执行,用右手的食指追踪write2()线程的执行。

从你的右手开始,追踪 write2() 因为它锁定了互斥锁,并开始它的工作,并发现

    if (port2.free) {

为真,则

        port2_ready.wait(lock);

等待这个条件变量。这会解锁互斥锁,您的左手食指现在可以向前移动了。您的左手食指现在向前移动,直到:

  port2_ready.notify_one();

这会通知另一个线程,它必须等到互斥体解锁,所以它会等待右手食指继续移动:

  if (port1.free) {

这是真的,所以现在第一个线程可以开始了。如果继续,您可以看到两个线程最终如何进入 process(),而不仅仅是其中一个。失败。

这个逻辑从根本上被打破了。有几种方法可以正确地做到这一点,但最简单的方法如下。当任一线程获取互斥量时,它

  • 检查线程拥有的端口是否已经有一个值(从上次调用它开始)。

  • 如果端口已经有值,则等待条件变量直到端口空闲(依赖其他线程清除它)。

  • 如果端口空闲,将其值保存在线程的端口中,然后检查其他线程的端口是否已经有值。如果没有,线程可以 return 并继续其获取下一个值的业务,确保另一个线程将负责处理两个保存的值。

  • 否则两个端口都有值,调用process()处理它们,清除两个端口,并通知另一个线程的条件变量,让它知道它是否正在等待它的端口免费,现在可以免费保存其他线程的下一个值。

即使 port1.freeport2.free 之一为真,您也会调用 process

您可以将代码更改为:

struct Port {
    std::optional<int> value;
};

class Consumer {
private:
    Port port1;
    Port port2;
    std::mutex mutex;
    std::condition_variable port1_ready;
    std::condition_variable port2_ready;
    std::vector<std::thread> workers;
    std::atomic<bool> done;
    int count;

public:
    Consumer() : done(false), count(0) {
        std::random_device rd;

        workers.push_back(std::thread([this, gen = std::mt19937{rd()}] () mutable {
            while (!done) {
                write1(gen());
            }
        }));

        workers.push_back(std::thread([this, gen = std::mt19937{rd()}] () mutable {
            while (!done) {
                write2(gen());
            }
        }));
        port1_ready.notify_one();
        port2_ready.notify_one();
    }

    void write1(int value) {
        std::unique_lock lock(mutex);
        port1_ready.wait(lock, [&](){ return !port1.value; });

        port1.value = value;
        std::cout << "port1 stored " << value << std::endl;
        if (port2.value) {
            process("port1");
        }
    }

    void write2(int value) {
        std::unique_lock lock(mutex);
        port2_ready.wait(lock, [&](){ return !port2.value; });
        port2.value = value;

        std::cout << "port2 stored " << value << std::endl;
        if (port1.value) {
            process("port2");
        }
    }

    void process(std::string string) {
        std::cout << string << " consumed " << *port1.value << " " << *port2.value << std::endl;
        port1.value.reset();
        port2.value.reset();
        port1_ready.notify_one();
        port2_ready.notify_one();
        if (count++ == 20) done = true;
    }

    ~Consumer() {
        for (auto& w: workers) {
            w.join();
        }
    }
};

int main() {
    Consumer c{};
}

Demo