消费者生产者与 pthreads 有竞争条件

Consumer-Producer having racing condition with pthreads

我正在使用 ubuntu 16.04 中的 pthreads 在具有 2 个处理器的虚拟机上实现消费者-生产者问题。我们有 n 个生产者和一个消费者,它通过缓冲区从每个生产者读取数据。每个生产者都有自己的缓冲区。问题是在某些时候我的代码死锁并且无法找出原因。可能是我弄乱了条件变量和互斥锁。

typedef struct
{
    Student*        buf;         // the buffer
    int             producer_id;
    size_t          len;         // number of items in the buffer
    pthread_mutex_t mutex;       // needed to add/remove data from the buffer
    pthread_cond_t  can_produce; // signaled when items are removed
    pthread_cond_t  can_consume; // signaled when items are added
    bool            isDone;
} buffer_t;

这是消费者和生产者之间数据传输的缓冲区结构。

void* producer(void *param)
{
    buffer_t* buffer = (buffer_t*)param;
    char*     line   = NULL;
    size_t    len    = 0;
    ssize_t   read;
    FILE*     inFileReader;

    inFileReader = fopen(inFile, "r");
    while ((read = getline(&line, &len, inFileReader)) != -1)
    {
        pthread_mutex_lock(&buffer->mutex);
        if (buffer->len == BUFFER_SIZE)
        {// buffer full wait
            pthread_cond_wait(&buffer->can_produce, &buffer->mutex);
        }

        //buffer filled up with the read data from the file.
        if (conditionValid)
        {
            //add item to buffer
            ++buffer->len;
        }

        pthread_cond_signal(&buffer->can_consume);
        pthread_mutex_unlock(&buffer->mutex);
    }

    fclose(inFileReader);
    buffer->isDone = true;
    pthread_exit(NULL);

    return NULL;
}

这是生产者,从给定的文件中读取数据并填充缓冲区,当文件结束时,生产者线程将退出。

void* consumer(void *param)
{
    buffer_t* buffer_array = (buffer_t*)param;
    sleep(rand() % 3);
    int i =0;

    while (!buffer_array[i%N].isDone || buffer_array[i%N].len != 0)
    {
        pthread_mutex_lock(&buffer_array[i%N].mutex);

        if (buffer_array[i%N].len == 0)
        {
            pthread_cond_wait(&buffer_array[i%N].can_consume, 
                              &buffer_array[i%N].mutex);
        }

        while (buffer_array[i%N].len != 0)
        {
            buffer_array[i%N].len--;
            //read from buffer
        }

        pthread_cond_signal(&buffer_array[i%N].can_produce);
        pthread_mutex_unlock(&buffer_array[i%N].mutex);
        i++;
    }

    fclose(outFileWriter);
    pthread_exit(NULL);

    return NULL;
} 

消费者从每个生产者读取缓冲区,当所有完成后消费者退出循环并终止线程。

感谢任何帮助。

死锁是因为生产者只设置了一个标志就完成了读取:

buffer->isDone = true;

因此,如果此时消费者正在等待生产者 1 条件:

    // say i == 1
    if (buffer_array[i%N].len == 0)
    {
        pthread_cond_wait(&buffer_array[i%N].can_consume, 
                          &buffer_array[i%N].mutex);
    }

条件永远不会触发。消费者没有进步,所以一切都只是停顿。

解决方案可能是在设置 isDone 后触发 can_consume