具有四个队列的多线程 producer/consumer
Multithreaded producer/consumer with four queues
我将 consumer/producer 问题与我的应用程序分开,以确保我的线程正常工作。
我有一个生产者线程和一个消费者线程池:在我的应用程序中,一个线程接受连接并将它们排队(在我的自定义结构中)在四个队列之一中,四个线程从队列中弹出并处理之前排队的连接;在这里,我的队列将包含 1 到 4 之间的随机 int
,没有自定义结构。
四个 mutex
确保每个队列的数据保护(在打印队列大小时,在终端上加上一个互斥量 cout
); a priority_queue
用于同步从四个队列中取出。生产者线程在右队列中推送一个新的 int
值,然后也推送到 priority_queue
中,这样当一个线程想要读取时,他首先需要从 priority_queue
中的 pop()
为了了解什么队列被推送(因为它是排序的,在一些随机推送之后我的 priority_queue
看起来像 1 1 1 2 3 3 3 3 4 4
,所以消费者线程会 pop()
,看到值 1
并理解它必须从队列中移除 1
).
为什么是四个队列?因为每个队列都有自己的优先级(1=最大,4=最小),所以在从队列2移除元素之前,应该先从队列1移除所有元素;所有其他队列的原因相同。由于这里我有一个从 1 到 4 的随机推值,所以应该没有饥饿。
编译 :g++ -std=c++11 -o producer-consumer-multiqueue producer-consumer-multiqueue.cpp -pthread
Ubuntu 14.04 x86_64,gcc 版本 4.8.4。
问题:抛开由于调度程序导致的奇怪输出,消费者线程没有按照我的意愿行事,因为正如您在下面的输出中看到的那样,它没有给出从队列 1 中删除元素的优先级,但 删除不遵循优先级(队列 1 最大,队列 4 分钟)。
我想在不使用外部库的情况下实现我的目标,no boost
et similia.
(0 0 1 0) // (elements in queue 1, in queue 2, in queue 3, in queue 4)
(1 0 1 0)
(1 1 1 0)
(0 0 0 0)
(0 0 0 0)
(0 0 0 0)
(1 0 0 0)
(2 0 0 0)
(2 1 0 0)
(1 1 0 1)
(1 0 0 1)
(1 0 0 0)
(1 0 0 0)
(0 0 0 0)
(1 0 0 0)
...CTRL+c
代码:这是我的完整测试文件,可按原样编译和执行:
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <random>
using namespace std;
// modify this to modify the number of consumer threads
#define WORKERS_THREADS 4
// max size of each of four queues
#define MAX_QUEUE_SIZE 100
// debug
#define DEFAULTCOLOR "3[0m"
#define RED "3[22;31m"
#define YELLOW "3[1;33m"
#define GREEN "3[0;0;32m"
class MultiQueue {
public:
void initThreadPool(void);
void insert(int num);
void remove(void);
void insertPriorityQueue(int num);
int removePriorityQueue(void);
void printQueues(string what);
int getQueue1Size(void);
int getQueue2Size(void);
int getQueue3Size(void);
int getQueue4Size(void);
int getPrioQueueSize(void);
private:
vector<thread> workers;
queue<int>q1;
queue<int>q2;
queue<int>q3;
queue<int>q4;
priority_queue<int, vector<int>, greater<int>> prioq;
// mutex for push/pop in priority queue
mutex priority_queue_mutex;
// 4 mutexes for each queue
mutex m1, m2, m3, m4;
// mutex for printing 4 queues size
mutex print;
// mutex for push/pop to priority_queue
condition_variable prioq_cond;
// 4 conds for consumer threads
condition_variable w1, w2, w3, w4;
};
int MultiQueue::getQueue1Size() { return q1.size(); }
int MultiQueue::getQueue2Size() { return q2.size(); }
int MultiQueue::getQueue3Size() { return q3.size(); }
int MultiQueue::getQueue4Size() { return q4.size(); }
int MultiQueue::getPrioQueueSize() { return prioq.size(); }
void MultiQueue::initThreadPool(void) {
for (int i=0; i<WORKERS_THREADS; i++) {
workers.push_back(thread(&MultiQueue::remove, this));
workers[i].detach();
}
}
void MultiQueue::printQueues(string what) {
lock_guard<mutex> l(print);
if (what == "insert")
cout << GREEN << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << '\n' << flush;
else
cout << YELLOW << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << '\n' << flush;
}
// called from producer thread to tell consumer threads
// what queues to pop() from
void MultiQueue::insertPriorityQueue(int num) {
lock_guard<mutex> prio(priority_queue_mutex);
prioq.push(num);
prioq_cond.notify_one();
}
// called from consumer threads to see what queues
// have elements to pop() from
int MultiQueue::removePriorityQueue(void) {
int ret = 0;
unique_lock<mutex> prio(priority_queue_mutex);
prioq_cond.wait(prio, [this] () { return getPrioQueueSize() > 0; });
ret = prioq.top();
prioq.pop();
return ret;
}
// producer thread
void MultiQueue::insert(int num) {
switch(num) {
case 1: {
unique_lock<mutex> locker(m1);
w1.wait(locker, [this] () { return getQueue1Size() < MAX_QUEUE_SIZE; });
q1.push(num);
break;
}
case 2: {
unique_lock<mutex> locker(m2);
w2.wait(locker, [this] () { return getQueue2Size() < MAX_QUEUE_SIZE; });
q2.push(num);
break;
}
case 3: {
unique_lock<mutex> locker(m3);
w3.wait(locker, [this] () { return getQueue3Size() < MAX_QUEUE_SIZE; });
q3.push(num);
break;
}
case 4: {
unique_lock<mutex> locker(m4);
w4.wait(locker, [this] () { return getQueue4Size() < MAX_QUEUE_SIZE; });
q4.push(num);
break;
}
default: {
cout << "number not 1, 2, 3 nor 4: " << num << '\n' << flush;
break;
}
}
printQueues("insert");
insertPriorityQueue(num);
}
void MultiQueue::remove(void) {
int which_queue = 0;
while (true) {
which_queue = removePriorityQueue();
switch (which_queue) {
case 1: {
lock_guard<mutex> lock(m1);
int ret = q1.front();
q1.pop();
printQueues("remove");
break;
}
case 2: {
lock_guard<mutex> lock(m2);
int ret = q2.front();
q2.pop();
printQueues("remove");
break;
}
case 3: {
lock_guard<mutex> lock(m3);
int ret = q3.front();
q3.pop();
printQueues("remove");
break;
}
case 4: {
lock_guard<mutex> lock(m4);
int ret = q4.front();
q4.pop();
printQueues("remove");
break;
}
default: {
break;
}
}
}
}
int main(void) {
int random_num = 0;
MultiQueue mq;
mq.initThreadPool();
default_random_engine eng((random_device())());
uniform_int_distribution<int> idis(1, 4);
while (true) {
random_num = idis(eng);
mq.insert(random_num);
}
return 0;
}
我在您的代码中发现了以下问题:
- 打印不一定反映元素弹出的顺序。一个线程从队列中提取元素,然后可以长时间等待
print
锁,而另一个在之后获得元素的线程可以是第一个获得 print
锁的线程。
- 与优先队列类似的问题。可能会出现这样的情况:第一个线程从优先队列中拿到元素,知道应该弹出
queue1
,然后第一个线程被调度器关闭,第二个线程开始工作。它还弹出优先级队列,然后继续弹出 queue2
(当第一个线程关闭时)。
我会听从评论的建议并使用单个 priority_queue<std::pair<int,int>>
,其中 std::pair<int,int>
的第一个元素是优先级,第二个元素是有效载荷。这将帮助您处理问题 2。
至于问题 1,您应该在与 pop
东西相同的锁下打印东西。
我将 consumer/producer 问题与我的应用程序分开,以确保我的线程正常工作。
我有一个生产者线程和一个消费者线程池:在我的应用程序中,一个线程接受连接并将它们排队(在我的自定义结构中)在四个队列之一中,四个线程从队列中弹出并处理之前排队的连接;在这里,我的队列将包含 1 到 4 之间的随机 int
,没有自定义结构。
四个 mutex
确保每个队列的数据保护(在打印队列大小时,在终端上加上一个互斥量 cout
); a priority_queue
用于同步从四个队列中取出。生产者线程在右队列中推送一个新的 int
值,然后也推送到 priority_queue
中,这样当一个线程想要读取时,他首先需要从 priority_queue
中的 pop()
为了了解什么队列被推送(因为它是排序的,在一些随机推送之后我的 priority_queue
看起来像 1 1 1 2 3 3 3 3 4 4
,所以消费者线程会 pop()
,看到值 1
并理解它必须从队列中移除 1
).
为什么是四个队列?因为每个队列都有自己的优先级(1=最大,4=最小),所以在从队列2移除元素之前,应该先从队列1移除所有元素;所有其他队列的原因相同。由于这里我有一个从 1 到 4 的随机推值,所以应该没有饥饿。
编译 :g++ -std=c++11 -o producer-consumer-multiqueue producer-consumer-multiqueue.cpp -pthread
Ubuntu 14.04 x86_64,gcc 版本 4.8.4。
问题:抛开由于调度程序导致的奇怪输出,消费者线程没有按照我的意愿行事,因为正如您在下面的输出中看到的那样,它没有给出从队列 1 中删除元素的优先级,但 删除不遵循优先级(队列 1 最大,队列 4 分钟)。
我想在不使用外部库的情况下实现我的目标,no boost
et similia.
(0 0 1 0) // (elements in queue 1, in queue 2, in queue 3, in queue 4)
(1 0 1 0)
(1 1 1 0)
(0 0 0 0)
(0 0 0 0)
(0 0 0 0)
(1 0 0 0)
(2 0 0 0)
(2 1 0 0)
(1 1 0 1)
(1 0 0 1)
(1 0 0 0)
(1 0 0 0)
(0 0 0 0)
(1 0 0 0)
...CTRL+c
代码:这是我的完整测试文件,可按原样编译和执行:
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <random>
using namespace std;
// modify this to modify the number of consumer threads
#define WORKERS_THREADS 4
// max size of each of four queues
#define MAX_QUEUE_SIZE 100
// debug
#define DEFAULTCOLOR "3[0m"
#define RED "3[22;31m"
#define YELLOW "3[1;33m"
#define GREEN "3[0;0;32m"
class MultiQueue {
public:
void initThreadPool(void);
void insert(int num);
void remove(void);
void insertPriorityQueue(int num);
int removePriorityQueue(void);
void printQueues(string what);
int getQueue1Size(void);
int getQueue2Size(void);
int getQueue3Size(void);
int getQueue4Size(void);
int getPrioQueueSize(void);
private:
vector<thread> workers;
queue<int>q1;
queue<int>q2;
queue<int>q3;
queue<int>q4;
priority_queue<int, vector<int>, greater<int>> prioq;
// mutex for push/pop in priority queue
mutex priority_queue_mutex;
// 4 mutexes for each queue
mutex m1, m2, m3, m4;
// mutex for printing 4 queues size
mutex print;
// mutex for push/pop to priority_queue
condition_variable prioq_cond;
// 4 conds for consumer threads
condition_variable w1, w2, w3, w4;
};
int MultiQueue::getQueue1Size() { return q1.size(); }
int MultiQueue::getQueue2Size() { return q2.size(); }
int MultiQueue::getQueue3Size() { return q3.size(); }
int MultiQueue::getQueue4Size() { return q4.size(); }
int MultiQueue::getPrioQueueSize() { return prioq.size(); }
void MultiQueue::initThreadPool(void) {
for (int i=0; i<WORKERS_THREADS; i++) {
workers.push_back(thread(&MultiQueue::remove, this));
workers[i].detach();
}
}
void MultiQueue::printQueues(string what) {
lock_guard<mutex> l(print);
if (what == "insert")
cout << GREEN << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << '\n' << flush;
else
cout << YELLOW << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << '\n' << flush;
}
// called from producer thread to tell consumer threads
// what queues to pop() from
void MultiQueue::insertPriorityQueue(int num) {
lock_guard<mutex> prio(priority_queue_mutex);
prioq.push(num);
prioq_cond.notify_one();
}
// called from consumer threads to see what queues
// have elements to pop() from
int MultiQueue::removePriorityQueue(void) {
int ret = 0;
unique_lock<mutex> prio(priority_queue_mutex);
prioq_cond.wait(prio, [this] () { return getPrioQueueSize() > 0; });
ret = prioq.top();
prioq.pop();
return ret;
}
// producer thread
void MultiQueue::insert(int num) {
switch(num) {
case 1: {
unique_lock<mutex> locker(m1);
w1.wait(locker, [this] () { return getQueue1Size() < MAX_QUEUE_SIZE; });
q1.push(num);
break;
}
case 2: {
unique_lock<mutex> locker(m2);
w2.wait(locker, [this] () { return getQueue2Size() < MAX_QUEUE_SIZE; });
q2.push(num);
break;
}
case 3: {
unique_lock<mutex> locker(m3);
w3.wait(locker, [this] () { return getQueue3Size() < MAX_QUEUE_SIZE; });
q3.push(num);
break;
}
case 4: {
unique_lock<mutex> locker(m4);
w4.wait(locker, [this] () { return getQueue4Size() < MAX_QUEUE_SIZE; });
q4.push(num);
break;
}
default: {
cout << "number not 1, 2, 3 nor 4: " << num << '\n' << flush;
break;
}
}
printQueues("insert");
insertPriorityQueue(num);
}
void MultiQueue::remove(void) {
int which_queue = 0;
while (true) {
which_queue = removePriorityQueue();
switch (which_queue) {
case 1: {
lock_guard<mutex> lock(m1);
int ret = q1.front();
q1.pop();
printQueues("remove");
break;
}
case 2: {
lock_guard<mutex> lock(m2);
int ret = q2.front();
q2.pop();
printQueues("remove");
break;
}
case 3: {
lock_guard<mutex> lock(m3);
int ret = q3.front();
q3.pop();
printQueues("remove");
break;
}
case 4: {
lock_guard<mutex> lock(m4);
int ret = q4.front();
q4.pop();
printQueues("remove");
break;
}
default: {
break;
}
}
}
}
int main(void) {
int random_num = 0;
MultiQueue mq;
mq.initThreadPool();
default_random_engine eng((random_device())());
uniform_int_distribution<int> idis(1, 4);
while (true) {
random_num = idis(eng);
mq.insert(random_num);
}
return 0;
}
我在您的代码中发现了以下问题:
- 打印不一定反映元素弹出的顺序。一个线程从队列中提取元素,然后可以长时间等待
print
锁,而另一个在之后获得元素的线程可以是第一个获得print
锁的线程。 - 与优先队列类似的问题。可能会出现这样的情况:第一个线程从优先队列中拿到元素,知道应该弹出
queue1
,然后第一个线程被调度器关闭,第二个线程开始工作。它还弹出优先级队列,然后继续弹出queue2
(当第一个线程关闭时)。
我会听从评论的建议并使用单个 priority_queue<std::pair<int,int>>
,其中 std::pair<int,int>
的第一个元素是优先级,第二个元素是有效载荷。这将帮助您处理问题 2。
至于问题 1,您应该在与 pop
东西相同的锁下打印东西。