Linux pthread 生产者和消费者

Linux pthread Producers and Consumers

出于学术目的,我必须编写一个 POSIX 基于线程和信号量的生产者和消费者问题的实现。为了检查实现是否有效,我对所有生产的和消耗的所有 'goods' 求和。问题是第二个总和在程序的后续执行之间有所不同,并不总是等于生产商品的数量。我使用固定大小的循环缓冲区来保存生成的值,2 个信号量让生产者和消费者进入临界区,2 个互斥锁来访问生产者和消费者的索引。 这是我的代码:

#include<unistd.h>
#include<pthread.h>
#include<semaphore.h>
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<time.h>
#define PRODUCERS_COUNT 10
#define CONSUMERS_COUNT 5
#define BUFFER_SIZE 2
#define ITERATIONS 1000
int buffer[BUFFER_SIZE];
pthread_mutex_t write_index_mutex;
pthread_mutex_t read_index_mutex;

sem_t producer_semaphore;
sem_t consumer_semaphore;

int write_index = 0;
int read_index = 0;

int total_produced_sum = 0;
int total_consumed_sum = 0;

void* producer_thread(void* args)
{
    int producer_id = *((int*)args);
    free(args);
    int my_write_index;
    int iterations = ITERATIONS;
    while(iterations--)
    {
        sem_wait(&producer_semaphore);
        pthread_mutex_lock(&write_index_mutex);
        my_write_index = write_index;
        write_index = (write_index + 1) % BUFFER_SIZE;
        total_produced_sum += producer_id;
        pthread_mutex_unlock(&write_index_mutex);
        buffer[my_write_index] = producer_id;
        sem_post(&consumer_semaphore);
        usleep((rand() % 10)); 
    }
    return NULL;
}

void* consumer_thread(void* args)
{
    int my_read_index;
    while(1)
    {
        sem_wait(&consumer_semaphore);
        pthread_mutex_lock(&read_index_mutex);
        my_read_index = read_index;
        read_index = (read_index + 1) % BUFFER_SIZE;
        total_consumed_sum += buffer[my_read_index];
        pthread_mutex_unlock(&read_index_mutex);
        sem_post(&producer_semaphore);
        usleep((rand() % 10));
    }
    return NULL;
}

int main()
{
    int i;
    int *id;
    pthread_t producers[PRODUCERS_COUNT];
    pthread_t consumers[CONSUMERS_COUNT];
    sem_init(&producer_semaphore, 0, BUFFER_SIZE);
    sem_init(&consumer_semaphore, 0, 0);
    pthread_mutex_init(&write_index_mutex, NULL);
    pthread_mutex_init(&read_index_mutex, NULL);
    for(i = 0 ; i < PRODUCERS_COUNT ; i++)
    {
        id = (int*)malloc(sizeof(int));
        *id = i+1;
        pthread_create(&producers[i], 0, producer_thread, (void*)id);
    }
    for(i = 0; i < CONSUMERS_COUNT; i++)
    {
        pthread_create(&consumers[i], 0, consumer_thread, NULL);
    }
    for(i = 0; i < PRODUCERS_COUNT; i++)
    {
        pthread_join(producers[i], NULL);
    }
    while(1)
    {
        sem_getvalue(&consumer_semaphore, &i);
        if(i == 0)
            break;
    }
    printf("Goods produced: %d goods consumed: %d\n", total_produced_sum, total_consumed_sum);
    return 0;
}

下面是同一程序运行 10 次后的一些示例输出,没有重新编译:

Goods produced: 55000 goods consumed: 54996
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 54998
Goods produced: 55000 goods consumed: 55003
Goods produced: 55000 goods consumed: 54998
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55008
Goods produced: 55000 goods consumed: 54999
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55000

是否存在任何逻辑或实现错误导致这些和不相等?

生产者不正确

考虑一下您的制作人:

    sem_wait(&producer_semaphore);
    pthread_mutex_lock(&write_index_mutex);
    my_write_index = write_index;
    write_index = (write_index + 1) % BUFFER_SIZE;
    total_produced_sum += producer_id;
    pthread_mutex_unlock(&write_index_mutex);

    // Producer #1 stops right here.

    buffer[my_write_index] = producer_id;
    sem_post(&consumer_semaphore);

让我们假设发生以下序列:

  1. Producer #1 一直运行到上面的评论,然后停止。假设它的 my_write_index 为 0。它已经将 write_index 递增到 1,但尚未向 buffer[0].
  2. 写入任何内容
  3. 现在,生产者 #2 运行整个代码。它的 my_write_index 为 1,将其 id 写入 buffer[1],然后发布到 consumer_semaphore
  4. 接下来,消费者 #1 获取 consumer_semaphore,因为它是由生产者 #2 发布的。它试图消耗一个元素,但下一个 read_index 是 0。不幸的是,buffer[0] 尚未被生产者 #1 写入,并且具有之前留下的任何未知值。

要修复您的代码,您需要修改生产者以在释放互斥量之前写入缓冲区。也就是说,调换这两行的顺序:

    pthread_mutex_unlock(&write_index_mutex);
    buffer[my_write_index] = producer_id;