具有共享缓冲区的两个等待线程 (producer/consumer)

Two waiting threads (producer/consumer) with a shared buffer

我试图让一堆生产者线程等到缓冲区有空间容纳一个项目,然后它尽可能地将项目放入缓冲区,如果没有更多空间则返回休眠。

同时应该有一堆消费者线程等待直到缓冲区中有东西,然后它会尽可能地从缓冲区中取出东西,如果它是空的则返回休眠。

在伪代码中,这就是我正在做的,但我得到的只是死锁。

condition_variable cvAdd;
condition_variable cvTake;
mutex smtx;

ProducerThread(){
    while(has something to produce){

         unique_lock<mutex> lock(smtx);
         while(buffer is full){
            cvAdd.wait(lock);
         }
         AddStuffToBuffer();
         cvTake.notify_one();
    }
}

ConsumerThread(){

     while(should be taking data){

        unique_lock<mutex> lock(smtx);
        while( buffer is empty ){
            cvTake.wait(lock);
        }   
        TakeStuffFromBuffer();
        if(BufferIsEmpty)
        cvAdd.notify_one();
     }

}

您的生产者和消费者都尝试锁定互斥锁,但没有线程解锁互斥锁。这意味着第一个获取锁的线程会持有它,而另一个线程永远不会运行。

考虑将互斥锁调用移动到每个线程执行其操作之前,然后在每个线程执行其操作(AddStuffTobuffer() 或 TakeStuffFromBuffer())后解锁。

我之前回答过这个问题,但我有点跑题了,因为我目前正在掌握 mutexlock_guard 等的基本机制和行为。我已经一直在看一些关于这个主题的视频,我目前正在看的一个视频实际上与 locking 相反,因为该视频展示了如何实现 LockFreeQueue 使用循环缓冲区或环形缓冲区,两个指针,并使用 atomic 而不是 mutex。现在,对于您目前的情况,atomicLockFreeQueue 无法回答您的问题,但我从该视频中学到的是循环缓冲区的想法。

因为您的生产者/消费者线程将共享同一个内存池。如果生产者线程与消费者线程的比例为 1 比 1,则很容易跟踪数组中的每个索引或每个指针。但是,当您拥有多对多时,事情确实会变得有点复杂。

可以做的事情之一是,如果您将缓冲区的大小限制为 N 个对象,您实际上可能希望将其创建为 N+1。一个额外的空space,这将有助于减轻在多个生产者和消费者之间共享的环形缓冲区结构中的一些复杂性。


取下图:

p = 生产者指数,c = 消费者指数,N 表示 [ ] 指数 spaces 的数量。 N = 5.

一对一

 p                N = 5
[ ][ ][ ][ ][ ]
 c

这里p和c都== 0。这表示缓冲区是空的。假设生产者在 c 收到任何东西之前填充缓冲区

             p    N = 5
[x][x][x][x][x]
 c

在这种情况下,缓冲区已满,p 必须等待空 space。 c 现在可以获取了。

             p     N = 5
[ ][x][x][x][x]
    c         

此处 c 在 [0] 处获取了对象,并将其索引推进到 1。 P 现在可以绕着环形缓冲区转圈了。

这很容易用一个 p & c 来跟踪。现在让我们探索多个消费者和一个生产者

一对多

 p                 N = 5
[ ][ ][ ][ ][ ]
c1
c2

此处p索引=0,c1&c2索引=0,环形缓冲区为空

             p     N = 5
[x][x][x][x][x]
c1
c2

现在 p 必须等待 c1 或 c2 在 [0] 获取项目才能写入

             p     N = 5
[ ][ ][x][x][x]
    c1 c2

这里不明显c1或c2是否获得了[0]或1,但都成功获得了一个物品。两者都增加了索引计数器。以上似乎表明 c1 从 [0] 增加到 1。然后位于 [0] 的 c2 也必须增加索引计数器,但它已经从 0 更改为 1,因此 c2 将其增加到 2。

如果我们假设 p == 0 && c1 || c2 == 0 缓冲区为空,则此处存在死锁情况。看看这里的情况。

 p               N = 5  // P hasn't written yet but has advanced 
[ ][ ][ ][ ][x]  // 1 Item is left
           c1  // Both c1 & c2 have index to same item.
           c2  // c1 acquires it and so does c2 but one of them finishes first and then increments the counter. Now the buffer is empty and looks like this:

 p                N = 5
[ ][ ][ ][ ][ ]
c1          c2    // p index = 0 and c1 = 0 represents empty buffer.
                  // c2 is trying to read [4]

这会导致死锁。

多对一

 p1
 p2                N = 5
[ ][ ][ ][ ][ ]
 c1

这里有多个生产者可以为单个消费者写入缓冲区。如果他们交错:

p1 writes to [0] increments counter
p2 writes to [0] increments counter

   p1 p2
[x][ ][ ][ ][ ]
c1

这将导致缓冲区为空 space。生产者互相干扰。这里需要互斥

具有多对多的思想;您需要考虑并结合一对多和多对一的上述两个功能。您需要为您的消费者使用一个互斥锁,为您的生产者使用一个互斥锁,尝试对两者使用相同的互斥锁会给您带来可能导致无法预料的死锁的问题。您必须确保检查所有案例,并确保在适当的时间 - 地点锁定它们。也许这几个视频可以帮助您了解更多。


伪代码:可能如下所示:

condition_variable cvAdd;
condition_variable cvTake;
mutex consumerMutex;
mutex producerMutex;

ProducerThread(){
    while( has something to produce ) {    
         unique_lock<mutex> lock(producerMutex);
         while(buffer is full){
            cvAdd.wait(lock);
         }
         AddStuffToBuffer();
         cvTake.notify_one();
    }
}

ConsumerThread() {    
     while( should be taking data ) {    
        unique_lock<mutex> lock(consumerMutex);
        while( buffer is empty ){
            cvTake.wait(lock);
        }   
        TakeStuffFromBuffer();
        if(BufferIsEmpty)
        cvAdd.notify_one();
     }    
}

这里唯一的区别是使用了 2 个互斥锁,而不是生产者和消费者都尝试使用同一个互斥锁。共享的是内存;但是您不想在两者之间共享计数器或指针到内存池中。多个生产者使用同一个互斥锁是可以的,多个消费者使用同一个互斥锁也是可以的,但是让消费者和生产者都使用同一个互斥锁可能是你的根本问题。

根据您的查询查看此示例。在这种情况下,一个 condition_variable 就足够了。

#include "conio.h"
#include <thread>
#include <mutex>
#include <queue>
#include <chrono>
#include <iostream>
#include <condition_variable>

using namespace std;

mutex smtx;
condition_variable cvAdd;
bool running ;
queue<int> buffer;

void ProducerThread(){
    static int data = 0;
    while(running){
        unique_lock<mutex> lock(smtx);
        if( !running) return;
        buffer.push(data++);
        lock.unlock();
        cvAdd.notify_one();
        this_thread::sleep_for(chrono::milliseconds(300));
    }
}

void ConsumerThread(){

     while(running){

        unique_lock<mutex> lock(smtx);
        cvAdd.wait(lock,[](){ return !running || !buffer.empty(); });
         if( !running) return;
        while( !buffer.empty() )
        {
            auto data = buffer.front();
            buffer.pop();
            cout << data <<" \n";

            this_thread::sleep_for(chrono::milliseconds(300)); 
        }                

     }

}

int main()
{
    running = true;
    thread producer = thread([](){ ProducerThread(); }); 
    thread consumer = thread([](){ ConsumerThread(); });

    while(!getch())
    { }    

    running = false;
    producer.join();
    consumer.join();  
}

另一个值得一提的错误是您的消费者仅在缓冲区变空时通知等待中的生产者。

仅当队列已满时通知消费者的最佳方式。

例如:

template<class T, size_t MaxQueueSize>
class Queue
{
    std::condition_variable consumer_, producer_;
    std::mutex mutex_;
    using unique_lock = std::unique_lock<std::mutex>;

    std::queue<T> queue_;

public:
    template<class U>
    void push_back(U&& item) {
        unique_lock lock(mutex_);
        while(MaxQueueSize == queue_.size())
            producer_.wait(lock);
        queue_.push(std::forward<U>(item));
        consumer_.notify_one();
    }

    T pop_front() {
        unique_lock lock(mutex_);
        while(queue_.empty())
            consumer_.wait(lock);
        auto full = MaxQueueSize == queue_.size();
        auto item = queue_.front();
        queue_.pop();
        if(full)
            producer_.notify_all();
        return item;
    }
};