具有四个队列的多线程 producer/consumer

Multithreaded producer/consumer with four queues

我将 consumer/producer 问题与我的应用程序分开,以确保我的线程正常工作。

我有一个生产者线程和一个消费者线程池:在我的应用程序中,一个线程接受连接并将它们排队(在我的自定义结构中)在四个队列之一中,四个线程从队列中弹出并处理之前排队的连接;在这里,我的队列将包含 1 到 4 之间的随机 int,没有自定义结构。

四个 mutex 确保每个队列的数据保护(在打印队列大小时,在终端上加上一个互斥量 cout); a priority_queue 用于同步从四个队列中取出。生产者线程在右队列中推送一个新的 int 值,然后也推送到 priority_queue 中,这样当一个线程想要读取时,他首先需要从 priority_queue 中的 pop()为了了解什么队列被推送(因为它是排序的,在一些随机推送之后我的 priority_queue 看起来像 1 1 1 2 3 3 3 3 4 4,所以消费者线程会 pop(),看到值 1 并理解它必须从队列中移除 1).

为什么是四个队列?因为每个队列都有自己的优先级(1=最大,4=最小),所以在从队列2移除元素之前,应该先从队列1移除所有元素;所有其他队列的原因相同。由于这里我有一个从 1 到 4 的随机推值,所以应该没有饥饿。

编译 g++ -std=c++11 -o producer-consumer-multiqueue producer-consumer-multiqueue.cpp -pthread Ubuntu 14.04 x86_64,gcc 版本 4.8.4。

问题:抛开由于调度程序导致的奇怪输出,消费者线程没有按照我的意愿行事,因为正如您在下面的输出中看到的那样,它没有给出从队列 1 中删除元素的优先级,但 删除不遵循优先级(队列 1 最大,队列 4 分钟)。 我想在不使用外部库的情况下实现我的目标,no boost et similia.

(0 0 1 0) // (elements in queue 1, in queue 2, in queue 3, in queue 4)
(1 0 1 0)
(1 1 1 0)
(0 0 0 0)
(0 0 0 0)
(0 0 0 0)
(1 0 0 0)
(2 0 0 0)
(2 1 0 0)
(1 1 0 1)
(1 0 0 1)
(1 0 0 0)
(1 0 0 0)
(0 0 0 0)
(1 0 0 0)
...CTRL+c

代码:这是我的完整测试文件,可按原样编译和执行:

#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <random>

using namespace std;

// modify this to modify the number of consumer threads
#define WORKERS_THREADS     4
// max size of each of four queues
#define MAX_QUEUE_SIZE      100
// debug
#define DEFAULTCOLOR        "3[0m"
#define RED                 "3[22;31m"
#define YELLOW              "3[1;33m"
#define GREEN               "3[0;0;32m"

class MultiQueue {
    public:
        void initThreadPool(void);
        void insert(int num);
        void remove(void);
        void insertPriorityQueue(int num);
        int removePriorityQueue(void);
        void printQueues(string what);
        int getQueue1Size(void);
        int getQueue2Size(void);
        int getQueue3Size(void);
        int getQueue4Size(void);
        int getPrioQueueSize(void);
    private:
        vector<thread> workers;

        queue<int>q1;
        queue<int>q2;
        queue<int>q3;
        queue<int>q4;

        priority_queue<int, vector<int>, greater<int>> prioq;

        // mutex for push/pop in priority queue
        mutex priority_queue_mutex;
        // 4 mutexes for each queue
        mutex m1, m2, m3, m4;
        // mutex for printing 4 queues size
        mutex print;

        // mutex for push/pop to priority_queue
        condition_variable prioq_cond;
        // 4 conds for consumer threads
        condition_variable w1, w2, w3, w4;
};

int MultiQueue::getQueue1Size() { return q1.size(); }

int MultiQueue::getQueue2Size() { return q2.size(); }

int MultiQueue::getQueue3Size() { return q3.size(); }

int MultiQueue::getQueue4Size() { return q4.size(); }

int MultiQueue::getPrioQueueSize() { return prioq.size(); }

void MultiQueue::initThreadPool(void) {
    for (int i=0; i<WORKERS_THREADS; i++) {
        workers.push_back(thread(&MultiQueue::remove, this));
        workers[i].detach();
    }
}

void MultiQueue::printQueues(string what) {
    lock_guard<mutex> l(print);
    if (what == "insert")
        cout << GREEN << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << '\n' << flush;
    else
        cout << YELLOW << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << '\n' << flush;
}

// called from producer thread to tell consumer threads 
// what queues to pop() from
void MultiQueue::insertPriorityQueue(int num) {
    lock_guard<mutex> prio(priority_queue_mutex);
    prioq.push(num);
    prioq_cond.notify_one();
}

// called from consumer threads to see what queues 
// have elements to pop() from
int MultiQueue::removePriorityQueue(void) {
    int ret = 0;
    unique_lock<mutex> prio(priority_queue_mutex);
    prioq_cond.wait(prio, [this] () { return getPrioQueueSize() > 0; });
    ret = prioq.top();
    prioq.pop();
    return ret;
}

// producer thread 
void MultiQueue::insert(int num) {
    switch(num) {
        case 1: {
            unique_lock<mutex> locker(m1);
            w1.wait(locker, [this] () { return getQueue1Size() < MAX_QUEUE_SIZE; });
            q1.push(num);
            break;
        }
        case 2: {
            unique_lock<mutex> locker(m2);
            w2.wait(locker, [this] () { return getQueue2Size() < MAX_QUEUE_SIZE; });
            q2.push(num);
            break;
        }
        case 3: {
            unique_lock<mutex> locker(m3);
            w3.wait(locker, [this] () { return getQueue3Size() < MAX_QUEUE_SIZE; });
            q3.push(num);
            break;      
        }
        case 4: {
            unique_lock<mutex> locker(m4);
            w4.wait(locker, [this] () { return getQueue4Size() < MAX_QUEUE_SIZE; });
            q4.push(num);
            break;
        }
        default: {
            cout << "number not 1, 2, 3 nor 4: " << num << '\n' << flush;
            break;
        }
    }
    printQueues("insert");
    insertPriorityQueue(num);
}

void MultiQueue::remove(void) {
    int which_queue = 0;
    while (true) {
        which_queue = removePriorityQueue();
        switch (which_queue) {
            case 1: {
                lock_guard<mutex> lock(m1);
                int ret = q1.front();
                q1.pop();
                printQueues("remove");
                break;
            }
            case 2: {
                lock_guard<mutex> lock(m2);
                int ret = q2.front();
                q2.pop();
                printQueues("remove");
                break;
            }
            case 3: {
                lock_guard<mutex> lock(m3);
                int ret = q3.front();
                q3.pop();
                printQueues("remove");
                break;
            }
            case 4: {
                lock_guard<mutex> lock(m4);
                int ret = q4.front();
                q4.pop();
                printQueues("remove");
                break;
            }
            default: {
                break;
            }
        }
    }
}

int main(void) {
    int random_num = 0;

    MultiQueue mq;
    mq.initThreadPool();

    default_random_engine eng((random_device())());
    uniform_int_distribution<int> idis(1, 4);
    while (true) {
        random_num = idis(eng);
        mq.insert(random_num);
    }

    return 0;
}

我在您的代码中发现了以下问题:

  1. 打印不一定反映元素弹出的顺序。一个线程从队列中提取元素,然后可以长时间等待 print 锁,而另一个在之后获得元素的线程可以是第一个获得 print 锁的线程。
  2. 与优先队列类似的问题。可能会出现这样的情况:第一个线程从优先队列中拿到元素,知道应该弹出queue1,然后第一个线程被调度器关闭,第二个线程开始工作。它还弹出优先级队列,然后继续弹出 queue2(当第一个线程关闭时)。

我会听从评论的建议并使用单个 priority_queue<std::pair<int,int>>,其中 std::pair<int,int> 的第一个元素是优先级,第二个元素是有效载荷。这将帮助您处理问题 2。 至于问题 1,您应该在与 pop 东西相同的锁下打印东西。