Linux pthread 生产者和消费者
Linux pthread Producers and Consumers
出于学术目的,我必须编写一个 POSIX 基于线程和信号量的生产者和消费者问题的实现。为了检查实现是否有效,我对所有生产的和消耗的所有 'goods' 求和。问题是第二个总和在程序的后续执行之间有所不同,并不总是等于生产商品的数量。我使用固定大小的循环缓冲区来保存生成的值,2 个信号量让生产者和消费者进入临界区,2 个互斥锁来访问生产者和消费者的索引。
这是我的代码:
#include<unistd.h>
#include<pthread.h>
#include<semaphore.h>
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<time.h>
#define PRODUCERS_COUNT 10
#define CONSUMERS_COUNT 5
#define BUFFER_SIZE 2
#define ITERATIONS 1000
int buffer[BUFFER_SIZE];
pthread_mutex_t write_index_mutex;
pthread_mutex_t read_index_mutex;
sem_t producer_semaphore;
sem_t consumer_semaphore;
int write_index = 0;
int read_index = 0;
int total_produced_sum = 0;
int total_consumed_sum = 0;
void* producer_thread(void* args)
{
int producer_id = *((int*)args);
free(args);
int my_write_index;
int iterations = ITERATIONS;
while(iterations--)
{
sem_wait(&producer_semaphore);
pthread_mutex_lock(&write_index_mutex);
my_write_index = write_index;
write_index = (write_index + 1) % BUFFER_SIZE;
total_produced_sum += producer_id;
pthread_mutex_unlock(&write_index_mutex);
buffer[my_write_index] = producer_id;
sem_post(&consumer_semaphore);
usleep((rand() % 10));
}
return NULL;
}
void* consumer_thread(void* args)
{
int my_read_index;
while(1)
{
sem_wait(&consumer_semaphore);
pthread_mutex_lock(&read_index_mutex);
my_read_index = read_index;
read_index = (read_index + 1) % BUFFER_SIZE;
total_consumed_sum += buffer[my_read_index];
pthread_mutex_unlock(&read_index_mutex);
sem_post(&producer_semaphore);
usleep((rand() % 10));
}
return NULL;
}
int main()
{
int i;
int *id;
pthread_t producers[PRODUCERS_COUNT];
pthread_t consumers[CONSUMERS_COUNT];
sem_init(&producer_semaphore, 0, BUFFER_SIZE);
sem_init(&consumer_semaphore, 0, 0);
pthread_mutex_init(&write_index_mutex, NULL);
pthread_mutex_init(&read_index_mutex, NULL);
for(i = 0 ; i < PRODUCERS_COUNT ; i++)
{
id = (int*)malloc(sizeof(int));
*id = i+1;
pthread_create(&producers[i], 0, producer_thread, (void*)id);
}
for(i = 0; i < CONSUMERS_COUNT; i++)
{
pthread_create(&consumers[i], 0, consumer_thread, NULL);
}
for(i = 0; i < PRODUCERS_COUNT; i++)
{
pthread_join(producers[i], NULL);
}
while(1)
{
sem_getvalue(&consumer_semaphore, &i);
if(i == 0)
break;
}
printf("Goods produced: %d goods consumed: %d\n", total_produced_sum, total_consumed_sum);
return 0;
}
下面是同一程序运行 10 次后的一些示例输出,没有重新编译:
Goods produced: 55000 goods consumed: 54996
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 54998
Goods produced: 55000 goods consumed: 55003
Goods produced: 55000 goods consumed: 54998
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55008
Goods produced: 55000 goods consumed: 54999
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55000
是否存在任何逻辑或实现错误导致这些和不相等?
生产者不正确
考虑一下您的制作人:
sem_wait(&producer_semaphore);
pthread_mutex_lock(&write_index_mutex);
my_write_index = write_index;
write_index = (write_index + 1) % BUFFER_SIZE;
total_produced_sum += producer_id;
pthread_mutex_unlock(&write_index_mutex);
// Producer #1 stops right here.
buffer[my_write_index] = producer_id;
sem_post(&consumer_semaphore);
让我们假设发生以下序列:
- Producer #1 一直运行到上面的评论,然后停止。假设它的
my_write_index
为 0。它已经将 write_index
递增到 1,但尚未向 buffer[0]
. 写入任何内容
- 现在,生产者 #2 运行整个代码。它的
my_write_index
为 1,将其 id 写入 buffer[1]
,然后发布到 consumer_semaphore
。
- 接下来,消费者 #1 获取
consumer_semaphore
,因为它是由生产者 #2 发布的。它试图消耗一个元素,但下一个 read_index
是 0。不幸的是,buffer[0]
尚未被生产者 #1 写入,并且具有之前留下的任何未知值。
要修复您的代码,您需要修改生产者以在释放互斥量之前写入缓冲区。也就是说,调换这两行的顺序:
pthread_mutex_unlock(&write_index_mutex);
buffer[my_write_index] = producer_id;
出于学术目的,我必须编写一个 POSIX 基于线程和信号量的生产者和消费者问题的实现。为了检查实现是否有效,我对所有生产的和消耗的所有 'goods' 求和。问题是第二个总和在程序的后续执行之间有所不同,并不总是等于生产商品的数量。我使用固定大小的循环缓冲区来保存生成的值,2 个信号量让生产者和消费者进入临界区,2 个互斥锁来访问生产者和消费者的索引。 这是我的代码:
#include<unistd.h>
#include<pthread.h>
#include<semaphore.h>
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<time.h>
#define PRODUCERS_COUNT 10
#define CONSUMERS_COUNT 5
#define BUFFER_SIZE 2
#define ITERATIONS 1000
int buffer[BUFFER_SIZE];
pthread_mutex_t write_index_mutex;
pthread_mutex_t read_index_mutex;
sem_t producer_semaphore;
sem_t consumer_semaphore;
int write_index = 0;
int read_index = 0;
int total_produced_sum = 0;
int total_consumed_sum = 0;
void* producer_thread(void* args)
{
int producer_id = *((int*)args);
free(args);
int my_write_index;
int iterations = ITERATIONS;
while(iterations--)
{
sem_wait(&producer_semaphore);
pthread_mutex_lock(&write_index_mutex);
my_write_index = write_index;
write_index = (write_index + 1) % BUFFER_SIZE;
total_produced_sum += producer_id;
pthread_mutex_unlock(&write_index_mutex);
buffer[my_write_index] = producer_id;
sem_post(&consumer_semaphore);
usleep((rand() % 10));
}
return NULL;
}
void* consumer_thread(void* args)
{
int my_read_index;
while(1)
{
sem_wait(&consumer_semaphore);
pthread_mutex_lock(&read_index_mutex);
my_read_index = read_index;
read_index = (read_index + 1) % BUFFER_SIZE;
total_consumed_sum += buffer[my_read_index];
pthread_mutex_unlock(&read_index_mutex);
sem_post(&producer_semaphore);
usleep((rand() % 10));
}
return NULL;
}
int main()
{
int i;
int *id;
pthread_t producers[PRODUCERS_COUNT];
pthread_t consumers[CONSUMERS_COUNT];
sem_init(&producer_semaphore, 0, BUFFER_SIZE);
sem_init(&consumer_semaphore, 0, 0);
pthread_mutex_init(&write_index_mutex, NULL);
pthread_mutex_init(&read_index_mutex, NULL);
for(i = 0 ; i < PRODUCERS_COUNT ; i++)
{
id = (int*)malloc(sizeof(int));
*id = i+1;
pthread_create(&producers[i], 0, producer_thread, (void*)id);
}
for(i = 0; i < CONSUMERS_COUNT; i++)
{
pthread_create(&consumers[i], 0, consumer_thread, NULL);
}
for(i = 0; i < PRODUCERS_COUNT; i++)
{
pthread_join(producers[i], NULL);
}
while(1)
{
sem_getvalue(&consumer_semaphore, &i);
if(i == 0)
break;
}
printf("Goods produced: %d goods consumed: %d\n", total_produced_sum, total_consumed_sum);
return 0;
}
下面是同一程序运行 10 次后的一些示例输出,没有重新编译:
Goods produced: 55000 goods consumed: 54996
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 54998
Goods produced: 55000 goods consumed: 55003
Goods produced: 55000 goods consumed: 54998
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55008
Goods produced: 55000 goods consumed: 54999
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55000
Goods produced: 55000 goods consumed: 55000
是否存在任何逻辑或实现错误导致这些和不相等?
生产者不正确
考虑一下您的制作人:
sem_wait(&producer_semaphore);
pthread_mutex_lock(&write_index_mutex);
my_write_index = write_index;
write_index = (write_index + 1) % BUFFER_SIZE;
total_produced_sum += producer_id;
pthread_mutex_unlock(&write_index_mutex);
// Producer #1 stops right here.
buffer[my_write_index] = producer_id;
sem_post(&consumer_semaphore);
让我们假设发生以下序列:
- Producer #1 一直运行到上面的评论,然后停止。假设它的
my_write_index
为 0。它已经将write_index
递增到 1,但尚未向buffer[0]
. 写入任何内容
- 现在,生产者 #2 运行整个代码。它的
my_write_index
为 1,将其 id 写入buffer[1]
,然后发布到consumer_semaphore
。 - 接下来,消费者 #1 获取
consumer_semaphore
,因为它是由生产者 #2 发布的。它试图消耗一个元素,但下一个read_index
是 0。不幸的是,buffer[0]
尚未被生产者 #1 写入,并且具有之前留下的任何未知值。
要修复您的代码,您需要修改生产者以在释放互斥量之前写入缓冲区。也就是说,调换这两行的顺序:
pthread_mutex_unlock(&write_index_mutex);
buffer[my_write_index] = producer_id;