如何在 pthreads 中正确同步线程?

How to properly synchronous threads in pthreads?

我正在使用 pthreads 和信号量实现生产者-消费者问题。我有 1 个生产者和 2 个消费者。我的制作人从文件中一个一个地读取字符,并将它们排入循环队列。我希望消费者从队列中读取并存储到单独的数组中。我希望阅读的方式是第一个消费者读取 2 个字符,第二个消费者每 3 个字符读取一次。我正在尝试使用 pthread_cond_wait() 来执行此操作,但没有成功。这是我的代码:

#include<iostream>
#include<pthread.h>
#include<fstream>
#include<unistd.h>
#include<semaphore.h>
#include<queue>
#include "circular_queue"

// define queue size
#define QUEUE_SIZE 5

// declare and initialize semaphore and read/write counter
static sem_t mutex,queueEmptyMutex;
//static int counter = 0;

// Queue for saving characters
static Queue charQueue(QUEUE_SIZE);
//static std::queue<char> charQueue;

// indicator for end of file
static bool endOfFile = false;

// save arrays
static char consumerArray1[100];
static char consumerArray2[100];

static pthread_cond_t cond;
static pthread_mutex_t cond_mutex; 
static bool thirdCharToRead = false;

void *Producer(void *ptr)
{
    int i=0;
    std::ifstream input("string.txt");
    char temp;
    while(input>>temp)
    {
        std::cout<<"reached here a"<<std::endl;
        sem_wait(&mutex);
        std::cout<<"reached here b"<<std::endl;
        if(!charQueue.full())
        {
            charQueue.enQueue(temp);
        }
        sem_post(&queueEmptyMutex);
        sem_post(&mutex);

        i++;

        sleep(4);
    }

    endOfFile = true;
    sem_post(&queueEmptyMutex);
    pthread_exit(NULL);
}

void *Consumer1(void *ptr)
{

    int i = 0;
    sem_wait(&queueEmptyMutex);
    bool loopCond = endOfFile;
    while(!loopCond)
    {
        std::cout<<"consumer 1 loop"<<std::endl;
        if(endOfFile)
        {

            loopCond = charQueue.empty();
            std::cout<<loopCond<<std::endl;
            sem_post(&queueEmptyMutex);
        }
       sem_wait(&queueEmptyMutex);


        sem_wait(&mutex);


        if(!charQueue.empty())
        {

            consumerArray1[i] = charQueue.deQueue();
            i++;
            if(i%2==0)
            {
                pthread_mutex_lock(&cond_mutex);
                std::cout<<"Signal cond. i = "<<i<<std::endl;
                thirdCharToRead = true;
                pthread_mutex_unlock(&cond_mutex);
                pthread_cond_signal(&cond);

            }
        }        
        if(charQueue.empty()&&endOfFile)
        {

            sem_post(&mutex);
            sem_post(&queueEmptyMutex);

            break;
        }  
        sem_post(&mutex);

        sleep(2);
        std::cout<<"consumer 1 loop end"<<std::endl;
    }

    consumerArray1[i] = '[=10=]';
    pthread_exit(NULL);

}

void *Consumer2(void *ptr)
{

    int i = 0;
    sem_wait(&queueEmptyMutex);
    bool loopCond = endOfFile;
    while(!loopCond)
    {
        std::cout<<"consumer 2 loop"<<std::endl;
        if(endOfFile)
        {

            loopCond = charQueue.empty();
            std::cout<<loopCond<<std::endl;
            sem_post(&queueEmptyMutex);
        }
        sem_wait(&queueEmptyMutex);


        sem_wait(&mutex);


        if(!charQueue.empty())
        {
            pthread_mutex_lock(&cond_mutex);

            while(!thirdCharToRead)
            {
                std::cout<<"Waiting for condition"<<std::endl;
                pthread_cond_wait(&cond,&cond_mutex);
            }
            std::cout<<"Wait over"<<std::endl;
            thirdCharToRead = false;
            pthread_mutex_unlock(&cond_mutex);

            consumerArray2[i] = charQueue.deQueue();

            i++;
        }  
        if(charQueue.empty()&& endOfFile)
        {
            sem_post(&mutex);
            sem_post(&queueEmptyMutex);
            break;
        }  

        sem_post(&mutex);
        std::cout<<"consumer 2 loop end"<<std::endl;
        sleep(2);

    }

    consumerArray2[i] = '[=10=]';
    pthread_exit(NULL);

}

int main()
{
    pthread_t thread[3];
    sem_init(&mutex,0,1);
    sem_init(&queueEmptyMutex,0,1);
    pthread_mutex_init(&cond_mutex,NULL);
    pthread_cond_init(&cond,NULL);
    pthread_create(&thread[0],NULL,Producer,NULL);
    int rc = pthread_create(&thread[1],NULL,Consumer1,NULL);
    if(rc)
    {
        std::cout<<"Thread not created"<<std::endl;
    }
    pthread_create(&thread[2],NULL,Consumer2,NULL);
    pthread_join(thread[0],NULL);pthread_join(thread[1],NULL);pthread_join(thread[2],NULL);
    std::cout<<"First array: "<<consumerArray1<<std::endl;
    std::cout<<"Second array: "<<consumerArray2<<std::endl;
    sem_destroy(&mutex);
    sem_destroy(&queueEmptyMutex);
    pthread_exit(NULL);
}

我遇到的问题是在读取一次后,消费者 2 在 while(!thirdCharToRead) 中进入无限循环。有没有更好的方法来实现这个?

好的,让我们从这段代码开始:

        std::cout<<"Wait over"<<std::endl;
        pthread_mutex_unlock(&cond_mutex);
        thirdCharToRead = false;

此代码表示 cond_mutex 不保护 thirdCharToRead 免受并发访问。为什么?因为它修改 thirdCharToRead 而不持有那个互斥量。

现在看这段代码:

        pthread_mutex_lock(&cond_mutex);

        while(!thirdCharToRead)
        {
            std::cout<<"Waiting for condition"<<std::endl;
            pthread_cond_wait(&cond,&cond_mutex);
        }

现在,while 循环检查 thirdCharToRead,因此我们在测试时必须持有保护 thirdCharToRead 免受并发访问的锁。但是如果 thirdCharToRead 在整个循环中保持锁定状态,while 循环将永远循环,因为没有其他线程可以更改它。因此,只有在循环中的某处释放保护 thirdCharToRead 的锁时,这段代码才有意义,而我们在循环中释放的唯一锁是 pthread_cond_wait 调用中的 cond_mutex

所以这个代码只有在 cond_mutex 保护 thirdCharToRead 时才有意义。

休斯顿,我们有问题了。一段代码说 cond_mutex 不保护 thirdCharToRead 而一段代码说 cond_mutex 保护 thirdCharToRead.