多个生产者和消费者但共享一个资源 - 只有一个线程 运行
Multiple producers and consumers but one shared reource - Only one thread is running
这里我创建了2个生产者线程和2个消费者线程。他们只在一个共享队列中放入和取出值。
问题是第一个生产者确实填写,然后进入等待模式。
此后没有其他线程运行。请解释我遗漏了什么。
#include "mainwindow.h"
#include <QApplication>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>
pthread_mutex_t mutexVariable = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t conditionVariable = PTHREAD_COND_INITIALIZER;
int numberOfActiveProducers;
int numberOfActiveConsumers;
QList <int> sharedQueue;
/*
* `sharedQueue`'s size is assumed to be 10 ATM.
* `sharedQueue` is supposed to be shared among two threads.
* Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
* Assumption: `sharedQueue` can contain only 10 elements at a time.
*/
int sizeOfSharedQueue;
// This function is run by the thread `Producer A`.
void *threadProducerAFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nProducer A";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () < 10) {
sharedQueue.push_back (1);
qDebug () << "\nPushed by Producer A: Length of queue is: " << sharedQueue.length ();
}
else {
qDebug () << "\nProducer A has done its bit and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
pthread_cond_wait (&conditionVariable, &mutexVariable);
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the thread `ProducerB`.
void *threadProducerBFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nProducer B";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () < 10) {
sharedQueue.push_back (1);
qDebug () << "\nPushed by Producer B: Length of queue is: " << sharedQueue.length ();
}
else {
qDebug () << "\nProducer B has done its bit and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
pthread_cond_wait (&conditionVariable, &mutexVariable);
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the thread `Consumer A`.
void *threadConsumerAFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nConsumer A";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () > 0) {
sharedQueue.pop_front ();
qDebug () << "\nRemoved by thread Consumer A. Length of queue is: " << sharedQueue.length ();
}
else {
pthread_cond_signal (&conditionVariable);
qDebug () << "\nSignal issued by thread Consumer A. Length of queue is: " << sharedQueue.length ();
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the thread `Consumer B`.
void *threadConsumerBFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nConsumer B";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () > 0) {
sharedQueue.pop_front ();
qDebug () << "\nRemoved by thread Consumer B. Length of queue is: " << sharedQueue.length ();
}
else {
pthread_cond_signal (&conditionVariable);
qDebug () << "\nSignal issued by thread Consumer B. Length of queue is: " << sharedQueue.length ();
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
int main (int argc, char *argv[]) {
numberOfActiveProducers = 2;
numberOfActiveConsumers = 2;
sizeOfSharedQueue = 10;
// `sharedQueue` initialization by 0.
for (int i = 0; i < sizeOfSharedQueue; i++) {
sharedQueue.push_back (0);
}
// Producer threads creation and joining
pthread_t producerA;
pthread_t producerB;
if (pthread_create (&producerA, NULL, threadProducerAFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer A\n");
return 1;
}
if (pthread_join (producerA, NULL)) {
fprintf (stderr, "Error joining thread Producer A\n");
return 2;
}
if (pthread_create (&producerB, NULL, threadProducerBFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer A\n");
return 1;
}
if (pthread_join (producerB, NULL)) {
fprintf (stderr, "Error joining thread Producer B\n");
return 2;
}
// Consumer threads creation and joining
pthread_t consumerA;
pthread_t consumerB;
if (pthread_create (&consumerA, NULL, threadConsumerAFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer A\n");
return 1;
}
if (pthread_join (consumerA, NULL)) {
fprintf (stderr, "Error joining thread Consumer A\n");
return 2;
}
if (pthread_create (&consumerB, NULL, threadConsumerBFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer B\n");
return 1;
}
if (pthread_join (consumerB, NULL)) {
fprintf (stderr, "Error joining thread Consumer B\n");
return 2;
}
QApplication a (argc, argv);
MainWindow w;
w.show ();
return a.exec ();
}
问题是 main
中的每个 pthread_create
调用之后有 pthread_join
个调用。 pthread_join
根据定义将阻塞,直到它正在等待的线程退出。由于 none 个子线程退出,结果是第一个 pthread_join
调用将无限期阻塞,因此执行 none 个后续 pthread_create
调用。
一个解决方法是删除所有 pthread_join
调用。 pthread_join
通常用于等待并获取子线程的 return 状态或同步主线程,使其不会在子线程完成之前退出。所以在这种情况下实际上不需要那些 pthread_join
调用,因为子线程不会退出,而主线程调用 a.exec()
来执行防止它退出的任务。
与实际问题无关,但我看到您实际上已经为每个线程复制了生产者和消费者代码。这是不必要的,因为同一个线程函数可以传递给多个 pthread_create
调用(只要没有静态变量)。如果您想区分实例以进行调试,请使用线程 ID 或将不同的 arg
传递给每个线程以进行识别。
您正在混合框架。
如果您已经在使用 Qt
,它会附带一整套线程 类,让生活变得更轻松。
我已经将您的代码转换为 Qt
等价物并且它可以正常工作。
#include <QtCore/qthread.h>
#include <QtWidgets/QApplication>
#include <QtCore/qmutex.h>
#include <QtCore/qwaitcondition.h>
#include <QtCore/QList.h>
#include <QtCore/qdebug.h>
QMutex mutex;
QWaitCondition waitCondition;
int numberOfActiveProducers;
int numberOfActiveConsumers;
QList<int> sharedQueue;
class Producer : public QThread{
public:
Producer(QString const &label) : label(label) {}
void run() {
forever {
qDebug() << QString("\nProducer %1").arg(label);
QMutexLocker locker(&mutex);
if (sharedQueue.length() < 10){
sharedQueue << 1;
qDebug() << QString("\nPushed by Producer %1: Length of queue is: %2").arg(label).arg(sharedQueue.length());
} else {
qDebug() << QString("\nProducer %1 has done its job and is now in waiting mode. Length of queue is: %2").arg(label).arg(sharedQueue.length());
waitCondition.wait(&mutex);
}
}
}
private:
QString label;
};
class Consumer : public QThread{
public:
Consumer(QString const &label) : label(label) {}
void run(){
forever {
qDebug() << QString("\nConsumer %1").arg(label);
QMutexLocker locker(&mutex);
if (sharedQueue.length() > 0){
sharedQueue.takeFirst();
qDebug() << QString("\nRemoved by thread Consumer %1. Length of queue is: %2").arg(label).arg(sharedQueue.length());
} else {
waitCondition.wakeAll();
qDebug() << QString("\nSignal issued by thread Consumer %1. Length of queue is: %2").arg(label).arg(sharedQueue.length());
}
}
}
private:
QString label;
};
int main(int argc, char **argv){
numberOfActiveConsumers = 2;
numberOfActiveProducers = 2;
QCoreApplication a(argc, argv);
Producer producerA("A");
Producer producerB("B");
Consumer consumerA("A");
Consumer consumerB("B");
producerA.start();
producerB.start();
consumerA.start();
consumerB.start();
return a.exec();
}
我必须补充一点,我通常不会将 QThread
sub类 用于一个简单的函数,除非我需要直接访问线程。通常,我会子类化 QRunnable
并将对象提供给 QThreadPool
以开始。
这里我创建了2个生产者线程和2个消费者线程。他们只在一个共享队列中放入和取出值。
问题是第一个生产者确实填写,然后进入等待模式。
此后没有其他线程运行。请解释我遗漏了什么。
#include "mainwindow.h"
#include <QApplication>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>
pthread_mutex_t mutexVariable = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t conditionVariable = PTHREAD_COND_INITIALIZER;
int numberOfActiveProducers;
int numberOfActiveConsumers;
QList <int> sharedQueue;
/*
* `sharedQueue`'s size is assumed to be 10 ATM.
* `sharedQueue` is supposed to be shared among two threads.
* Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
* Assumption: `sharedQueue` can contain only 10 elements at a time.
*/
int sizeOfSharedQueue;
// This function is run by the thread `Producer A`.
void *threadProducerAFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nProducer A";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () < 10) {
sharedQueue.push_back (1);
qDebug () << "\nPushed by Producer A: Length of queue is: " << sharedQueue.length ();
}
else {
qDebug () << "\nProducer A has done its bit and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
pthread_cond_wait (&conditionVariable, &mutexVariable);
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the thread `ProducerB`.
void *threadProducerBFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nProducer B";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () < 10) {
sharedQueue.push_back (1);
qDebug () << "\nPushed by Producer B: Length of queue is: " << sharedQueue.length ();
}
else {
qDebug () << "\nProducer B has done its bit and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
pthread_cond_wait (&conditionVariable, &mutexVariable);
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the thread `Consumer A`.
void *threadConsumerAFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nConsumer A";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () > 0) {
sharedQueue.pop_front ();
qDebug () << "\nRemoved by thread Consumer A. Length of queue is: " << sharedQueue.length ();
}
else {
pthread_cond_signal (&conditionVariable);
qDebug () << "\nSignal issued by thread Consumer A. Length of queue is: " << sharedQueue.length ();
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
// This function is run by the thread `Consumer B`.
void *threadConsumerBFunction (void *arg) {
Q_UNUSED (arg);
while (1) {
qDebug () << "\nConsumer B";
pthread_mutex_lock (&mutexVariable);
if (sharedQueue.length () > 0) {
sharedQueue.pop_front ();
qDebug () << "\nRemoved by thread Consumer B. Length of queue is: " << sharedQueue.length ();
}
else {
pthread_cond_signal (&conditionVariable);
qDebug () << "\nSignal issued by thread Consumer B. Length of queue is: " << sharedQueue.length ();
}
pthread_mutex_unlock (&mutexVariable);
}
return NULL;
}
int main (int argc, char *argv[]) {
numberOfActiveProducers = 2;
numberOfActiveConsumers = 2;
sizeOfSharedQueue = 10;
// `sharedQueue` initialization by 0.
for (int i = 0; i < sizeOfSharedQueue; i++) {
sharedQueue.push_back (0);
}
// Producer threads creation and joining
pthread_t producerA;
pthread_t producerB;
if (pthread_create (&producerA, NULL, threadProducerAFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer A\n");
return 1;
}
if (pthread_join (producerA, NULL)) {
fprintf (stderr, "Error joining thread Producer A\n");
return 2;
}
if (pthread_create (&producerB, NULL, threadProducerBFunction, NULL)) {
fprintf (stderr, "Error creating thread Producer A\n");
return 1;
}
if (pthread_join (producerB, NULL)) {
fprintf (stderr, "Error joining thread Producer B\n");
return 2;
}
// Consumer threads creation and joining
pthread_t consumerA;
pthread_t consumerB;
if (pthread_create (&consumerA, NULL, threadConsumerAFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer A\n");
return 1;
}
if (pthread_join (consumerA, NULL)) {
fprintf (stderr, "Error joining thread Consumer A\n");
return 2;
}
if (pthread_create (&consumerB, NULL, threadConsumerBFunction, NULL)) {
fprintf (stderr, "Error creating thread Consumer B\n");
return 1;
}
if (pthread_join (consumerB, NULL)) {
fprintf (stderr, "Error joining thread Consumer B\n");
return 2;
}
QApplication a (argc, argv);
MainWindow w;
w.show ();
return a.exec ();
}
问题是 main
中的每个 pthread_create
调用之后有 pthread_join
个调用。 pthread_join
根据定义将阻塞,直到它正在等待的线程退出。由于 none 个子线程退出,结果是第一个 pthread_join
调用将无限期阻塞,因此执行 none 个后续 pthread_create
调用。
一个解决方法是删除所有 pthread_join
调用。 pthread_join
通常用于等待并获取子线程的 return 状态或同步主线程,使其不会在子线程完成之前退出。所以在这种情况下实际上不需要那些 pthread_join
调用,因为子线程不会退出,而主线程调用 a.exec()
来执行防止它退出的任务。
与实际问题无关,但我看到您实际上已经为每个线程复制了生产者和消费者代码。这是不必要的,因为同一个线程函数可以传递给多个 pthread_create
调用(只要没有静态变量)。如果您想区分实例以进行调试,请使用线程 ID 或将不同的 arg
传递给每个线程以进行识别。
您正在混合框架。
如果您已经在使用 Qt
,它会附带一整套线程 类,让生活变得更轻松。
我已经将您的代码转换为 Qt
等价物并且它可以正常工作。
#include <QtCore/qthread.h>
#include <QtWidgets/QApplication>
#include <QtCore/qmutex.h>
#include <QtCore/qwaitcondition.h>
#include <QtCore/QList.h>
#include <QtCore/qdebug.h>
QMutex mutex;
QWaitCondition waitCondition;
int numberOfActiveProducers;
int numberOfActiveConsumers;
QList<int> sharedQueue;
class Producer : public QThread{
public:
Producer(QString const &label) : label(label) {}
void run() {
forever {
qDebug() << QString("\nProducer %1").arg(label);
QMutexLocker locker(&mutex);
if (sharedQueue.length() < 10){
sharedQueue << 1;
qDebug() << QString("\nPushed by Producer %1: Length of queue is: %2").arg(label).arg(sharedQueue.length());
} else {
qDebug() << QString("\nProducer %1 has done its job and is now in waiting mode. Length of queue is: %2").arg(label).arg(sharedQueue.length());
waitCondition.wait(&mutex);
}
}
}
private:
QString label;
};
class Consumer : public QThread{
public:
Consumer(QString const &label) : label(label) {}
void run(){
forever {
qDebug() << QString("\nConsumer %1").arg(label);
QMutexLocker locker(&mutex);
if (sharedQueue.length() > 0){
sharedQueue.takeFirst();
qDebug() << QString("\nRemoved by thread Consumer %1. Length of queue is: %2").arg(label).arg(sharedQueue.length());
} else {
waitCondition.wakeAll();
qDebug() << QString("\nSignal issued by thread Consumer %1. Length of queue is: %2").arg(label).arg(sharedQueue.length());
}
}
}
private:
QString label;
};
int main(int argc, char **argv){
numberOfActiveConsumers = 2;
numberOfActiveProducers = 2;
QCoreApplication a(argc, argv);
Producer producerA("A");
Producer producerB("B");
Consumer consumerA("A");
Consumer consumerB("B");
producerA.start();
producerB.start();
consumerA.start();
consumerB.start();
return a.exec();
}
我必须补充一点,我通常不会将 QThread
sub类 用于一个简单的函数,除非我需要直接访问线程。通常,我会子类化 QRunnable
并将对象提供给 QThreadPool
以开始。