在使用 pthread 和信号量实现生产者-消费者问题时需要帮助
Need help in implementing the Producer-Consumer problem with pthread and semaphore
我正在尝试使用 pthread 和信号量在 C++ 中实现生产者和消费者问题。我有一个生产者和两个消费者。我的生产者从文件中读取一个字符串并将其逐个字符地存储在队列中。消费者也一个一个地从字符串中读取并存储在一个字符中。问题是我的 Consumer 中只有一个正在从队列中读取,另一个不是,它的数组仍然是空的。我该如何解决这个问题。这是我的程序:
#include<iostream>
#include<pthread.h>
#include<fstream>
#include<unistd.h>
#include<semaphore.h>
#include<queue>
// define queue size
#define QUEUE_SIZE 5
// declare and initialize semaphore and read/write counter
static sem_t mutex,mutex1;
//static int counter = 0;
// Queue for saving characters
static std::queue<char> charQueue;
// indicator for end of file
static bool endOfFile = false;
// save arrays
static char consumerArray1[100];
static char consumerArray2[100];
void *Producer(void *ptr)
{
int i=0;
std::ifstream input("string.txt");
char temp;
while(input>>temp)
{
sem_wait(&mutex);
charQueue.push(temp);
sem_post(&mutex1);
sem_post(&mutex);
//counter++;
std::cout<<"Procuder Index: "<<i<<std::endl;
i++;
sleep(6);
}
endOfFile = true;
pthread_exit(NULL);
}
void *Consumer1(void *ptr)
{
std::cout<<"Entered consumer 1:"<<std::endl;
int i = 0;
sem_wait(&mutex1);
//while(charQueue.empty());
sem_post(&mutex1);
while(!endOfFile)// || !charQueue.empty())
{
sem_wait(&mutex1);
sem_wait(&mutex);
std::cout<<"Consumer1 index:"<<i<<" char: "<<charQueue.front()<<std::endl;
consumerArray1[i] = charQueue.front();
charQueue.pop();
//std::cout<<charQueue.size()<<std::endl;
sem_post(&mutex1);
i++;
//counter--;
sem_post(&mutex);
sleep(2);
}
consumerArray1[i] = '[=10=]';
pthread_exit(NULL);
}
void *Consumer2(void *ptr)
{
std::cout<<"Entered consumer 2:"<<std::endl;
int i = 0;
sem_wait(&mutex1);
//while(charQueue.empty());
sem_post(&mutex1);
while(!endOfFile)// || charQueue.empty())
{
sem_wait(&mutex1);
sem_wait(&mutex);
std::cout<<"Consumer2 index: "<<i<<" char: "<<charQueue.front()<<std::endl;
consumerArray2[i] = charQueue.front();
charQueue.pop();
sem_post(&mutex1);
i++;
//counter--;
sem_post(&mutex);
sleep(4);
}
consumerArray2[i] = '[=10=]';
pthread_exit(NULL);
}
int main()
{
pthread_t thread[3];
sem_init(&mutex,0,1);
sem_init(&mutex1,0,1);
pthread_create(&thread[0],NULL,Producer,NULL);
int rc = pthread_create(&thread[1],NULL,Consumer1,NULL);
if(rc)
{
std::cout<<"Thread not created"<<std::endl;
}
pthread_create(&thread[2],NULL,Consumer2,NULL);
pthread_join(thread[0],NULL);pthread_join(thread[1],NULL);pthread_join(thread[2],NULL);
std::cout<<"First array: "<<consumerArray1<<std::endl;
std::cout<<"Second array: "<<consumerArray2<<std::endl;
sem_destroy(&mutex);
sem_destroy(&mutex1);
pthread_exit(NULL);
}
编辑:我也在 charQueue.empty()
和 charQueue.push()
的访问周围添加了信号量,但输出没有变化。我还应该做什么?
您遇到了与之前相同的问题。您的 Consumer1
函数可以调用 charQueue.empty
,而您的 Producer
函数可以调用 charQueue.push(temp);
。您不能在一个线程中访问一个对象,而另一个线程正在或可能正在修改它。您需要使用互斥锁、信号量或其他形式的同步原语来保护 charQueue
。
同样,编译器可以像这样自由优化代码:
while(charQueue.empty());
像这样编码:
if (charQueue.empty()) while (1);
为什么?因为你的代码随时都可能在访问charQueue
。并且明确禁止一个线程修改一个对象而另一个线程可能正在访问它。因此,允许编译器假设在执行此循环时 charQueue
不会被修改,因此无需多次检查它是否为空。
您有信号量。使用它们来确保一次只有一个线程可能触及 charQueue
。
根据@DavidSchwartz 的指导,我编写了这段有效的代码。请建议我一个更好的方法来实现它,就好像有更好、更安全的方法来做我所做的事情一样。很抱歉我没有得到大部分评论和回答,因为这是我使用 pthreads 和信号量的第一个代码。所以,请耐心等待:
#include<iostream>
#include<pthread.h>
#include<fstream>
#include<unistd.h>
#include<semaphore.h>
#include<queue>
// define queue size
#define QUEUE_SIZE 5
// declare and initialize semaphore and read/write counter
static sem_t mutex,mutex1;
//static int counter = 0;
// Queue for saving characters
static std::queue<char> charQueue;
// indicator for end of file
static bool endOfFile = false;
// save arrays
static char consumerArray1[100];
static char consumerArray2[100];
void *Producer(void *ptr)
{
int i=0;
std::ifstream input("string.txt");
char temp;
while(input>>temp)
{
sem_wait(&mutex);
charQueue.push(temp);
sem_post(&mutex1);
sem_post(&mutex);
i++;
sleep(6);
}
endOfFile = true;
sem_post(&mutex1);
pthread_exit(NULL);
}
void *Consumer1(void *ptr)
{
int i = 0;
sem_wait(&mutex1);
bool loopCond = endOfFile;
while(!loopCond)
{
if(endOfFile)
{
loopCond = charQueue.empty();
std::cout<<loopCond<<std::endl;
sem_post(&mutex1);
}
sem_wait(&mutex1);
sem_wait(&mutex);
if(!charQueue.empty())
{
consumerArray1[i] = charQueue.front();
charQueue.pop();
i++;
}
if(charQueue.empty()&&endOfFile)
{
sem_post(&mutex);
sem_post(&mutex1);
break;
}
sem_post(&mutex);
sleep(2);
}
consumerArray1[i] = '[=10=]';
pthread_exit(NULL);
}
void *Consumer2(void *ptr)
{
int i = 0;
sem_wait(&mutex1);
bool loopCond = endOfFile;
while(!loopCond)
{
if(endOfFile)
{
loopCond = charQueue.empty();
std::cout<<loopCond<<std::endl;
sem_post(&mutex1);
}
sem_wait(&mutex1);
sem_wait(&mutex);
if(!charQueue.empty())
{
consumerArray2[i] = charQueue.front();
charQueue.pop();
i++;
}
if(charQueue.empty()&& endOfFile)
{
sem_post(&mutex);
sem_post(&mutex1);
break;
}
sem_post(&mutex);
sleep(4);
}
consumerArray2[i] = '[=10=]';
pthread_exit(NULL);
}
int main()
{
pthread_t thread[3];
sem_init(&mutex,0,1);
sem_init(&mutex1,0,1);
pthread_create(&thread[0],NULL,Producer,NULL);
int rc = pthread_create(&thread[1],NULL,Consumer1,NULL);
if(rc)
{
std::cout<<"Thread not created"<<std::endl;
}
pthread_create(&thread[2],NULL,Consumer2,NULL);
pthread_join(thread[0],NULL);pthread_join(thread[1],NULL);pthread_join(thread[2],NULL);
std::cout<<"First array: "<<consumerArray1<<std::endl;
std::cout<<"Second array: "<<consumerArray2<<std::endl;
sem_destroy(&mutex);
sem_destroy(&mutex1);
pthread_exit(NULL);
}
我正在尝试使用 pthread 和信号量在 C++ 中实现生产者和消费者问题。我有一个生产者和两个消费者。我的生产者从文件中读取一个字符串并将其逐个字符地存储在队列中。消费者也一个一个地从字符串中读取并存储在一个字符中。问题是我的 Consumer 中只有一个正在从队列中读取,另一个不是,它的数组仍然是空的。我该如何解决这个问题。这是我的程序:
#include<iostream>
#include<pthread.h>
#include<fstream>
#include<unistd.h>
#include<semaphore.h>
#include<queue>
// define queue size
#define QUEUE_SIZE 5
// declare and initialize semaphore and read/write counter
static sem_t mutex,mutex1;
//static int counter = 0;
// Queue for saving characters
static std::queue<char> charQueue;
// indicator for end of file
static bool endOfFile = false;
// save arrays
static char consumerArray1[100];
static char consumerArray2[100];
void *Producer(void *ptr)
{
int i=0;
std::ifstream input("string.txt");
char temp;
while(input>>temp)
{
sem_wait(&mutex);
charQueue.push(temp);
sem_post(&mutex1);
sem_post(&mutex);
//counter++;
std::cout<<"Procuder Index: "<<i<<std::endl;
i++;
sleep(6);
}
endOfFile = true;
pthread_exit(NULL);
}
void *Consumer1(void *ptr)
{
std::cout<<"Entered consumer 1:"<<std::endl;
int i = 0;
sem_wait(&mutex1);
//while(charQueue.empty());
sem_post(&mutex1);
while(!endOfFile)// || !charQueue.empty())
{
sem_wait(&mutex1);
sem_wait(&mutex);
std::cout<<"Consumer1 index:"<<i<<" char: "<<charQueue.front()<<std::endl;
consumerArray1[i] = charQueue.front();
charQueue.pop();
//std::cout<<charQueue.size()<<std::endl;
sem_post(&mutex1);
i++;
//counter--;
sem_post(&mutex);
sleep(2);
}
consumerArray1[i] = '[=10=]';
pthread_exit(NULL);
}
void *Consumer2(void *ptr)
{
std::cout<<"Entered consumer 2:"<<std::endl;
int i = 0;
sem_wait(&mutex1);
//while(charQueue.empty());
sem_post(&mutex1);
while(!endOfFile)// || charQueue.empty())
{
sem_wait(&mutex1);
sem_wait(&mutex);
std::cout<<"Consumer2 index: "<<i<<" char: "<<charQueue.front()<<std::endl;
consumerArray2[i] = charQueue.front();
charQueue.pop();
sem_post(&mutex1);
i++;
//counter--;
sem_post(&mutex);
sleep(4);
}
consumerArray2[i] = '[=10=]';
pthread_exit(NULL);
}
int main()
{
pthread_t thread[3];
sem_init(&mutex,0,1);
sem_init(&mutex1,0,1);
pthread_create(&thread[0],NULL,Producer,NULL);
int rc = pthread_create(&thread[1],NULL,Consumer1,NULL);
if(rc)
{
std::cout<<"Thread not created"<<std::endl;
}
pthread_create(&thread[2],NULL,Consumer2,NULL);
pthread_join(thread[0],NULL);pthread_join(thread[1],NULL);pthread_join(thread[2],NULL);
std::cout<<"First array: "<<consumerArray1<<std::endl;
std::cout<<"Second array: "<<consumerArray2<<std::endl;
sem_destroy(&mutex);
sem_destroy(&mutex1);
pthread_exit(NULL);
}
编辑:我也在 charQueue.empty()
和 charQueue.push()
的访问周围添加了信号量,但输出没有变化。我还应该做什么?
您遇到了与之前相同的问题。您的 Consumer1
函数可以调用 charQueue.empty
,而您的 Producer
函数可以调用 charQueue.push(temp);
。您不能在一个线程中访问一个对象,而另一个线程正在或可能正在修改它。您需要使用互斥锁、信号量或其他形式的同步原语来保护 charQueue
。
同样,编译器可以像这样自由优化代码:
while(charQueue.empty());
像这样编码:
if (charQueue.empty()) while (1);
为什么?因为你的代码随时都可能在访问charQueue
。并且明确禁止一个线程修改一个对象而另一个线程可能正在访问它。因此,允许编译器假设在执行此循环时 charQueue
不会被修改,因此无需多次检查它是否为空。
您有信号量。使用它们来确保一次只有一个线程可能触及 charQueue
。
根据@DavidSchwartz 的指导,我编写了这段有效的代码。请建议我一个更好的方法来实现它,就好像有更好、更安全的方法来做我所做的事情一样。很抱歉我没有得到大部分评论和回答,因为这是我使用 pthreads 和信号量的第一个代码。所以,请耐心等待:
#include<iostream>
#include<pthread.h>
#include<fstream>
#include<unistd.h>
#include<semaphore.h>
#include<queue>
// define queue size
#define QUEUE_SIZE 5
// declare and initialize semaphore and read/write counter
static sem_t mutex,mutex1;
//static int counter = 0;
// Queue for saving characters
static std::queue<char> charQueue;
// indicator for end of file
static bool endOfFile = false;
// save arrays
static char consumerArray1[100];
static char consumerArray2[100];
void *Producer(void *ptr)
{
int i=0;
std::ifstream input("string.txt");
char temp;
while(input>>temp)
{
sem_wait(&mutex);
charQueue.push(temp);
sem_post(&mutex1);
sem_post(&mutex);
i++;
sleep(6);
}
endOfFile = true;
sem_post(&mutex1);
pthread_exit(NULL);
}
void *Consumer1(void *ptr)
{
int i = 0;
sem_wait(&mutex1);
bool loopCond = endOfFile;
while(!loopCond)
{
if(endOfFile)
{
loopCond = charQueue.empty();
std::cout<<loopCond<<std::endl;
sem_post(&mutex1);
}
sem_wait(&mutex1);
sem_wait(&mutex);
if(!charQueue.empty())
{
consumerArray1[i] = charQueue.front();
charQueue.pop();
i++;
}
if(charQueue.empty()&&endOfFile)
{
sem_post(&mutex);
sem_post(&mutex1);
break;
}
sem_post(&mutex);
sleep(2);
}
consumerArray1[i] = '[=10=]';
pthread_exit(NULL);
}
void *Consumer2(void *ptr)
{
int i = 0;
sem_wait(&mutex1);
bool loopCond = endOfFile;
while(!loopCond)
{
if(endOfFile)
{
loopCond = charQueue.empty();
std::cout<<loopCond<<std::endl;
sem_post(&mutex1);
}
sem_wait(&mutex1);
sem_wait(&mutex);
if(!charQueue.empty())
{
consumerArray2[i] = charQueue.front();
charQueue.pop();
i++;
}
if(charQueue.empty()&& endOfFile)
{
sem_post(&mutex);
sem_post(&mutex1);
break;
}
sem_post(&mutex);
sleep(4);
}
consumerArray2[i] = '[=10=]';
pthread_exit(NULL);
}
int main()
{
pthread_t thread[3];
sem_init(&mutex,0,1);
sem_init(&mutex1,0,1);
pthread_create(&thread[0],NULL,Producer,NULL);
int rc = pthread_create(&thread[1],NULL,Consumer1,NULL);
if(rc)
{
std::cout<<"Thread not created"<<std::endl;
}
pthread_create(&thread[2],NULL,Consumer2,NULL);
pthread_join(thread[0],NULL);pthread_join(thread[1],NULL);pthread_join(thread[2],NULL);
std::cout<<"First array: "<<consumerArray1<<std::endl;
std::cout<<"Second array: "<<consumerArray2<<std::endl;
sem_destroy(&mutex);
sem_destroy(&mutex1);
pthread_exit(NULL);
}