消费者生产者与 pthreads 有竞争条件
Consumer-Producer having racing condition with pthreads
我正在使用 ubuntu 16.04 中的 pthreads 在具有 2 个处理器的虚拟机上实现消费者-生产者问题。我们有 n 个生产者和一个消费者,它通过缓冲区从每个生产者读取数据。每个生产者都有自己的缓冲区。问题是在某些时候我的代码死锁并且无法找出原因。可能是我弄乱了条件变量和互斥锁。
typedef struct
{
Student* buf; // the buffer
int producer_id;
size_t len; // number of items in the buffer
pthread_mutex_t mutex; // needed to add/remove data from the buffer
pthread_cond_t can_produce; // signaled when items are removed
pthread_cond_t can_consume; // signaled when items are added
bool isDone;
} buffer_t;
这是消费者和生产者之间数据传输的缓冲区结构。
void* producer(void *param)
{
buffer_t* buffer = (buffer_t*)param;
char* line = NULL;
size_t len = 0;
ssize_t read;
FILE* inFileReader;
inFileReader = fopen(inFile, "r");
while ((read = getline(&line, &len, inFileReader)) != -1)
{
pthread_mutex_lock(&buffer->mutex);
if (buffer->len == BUFFER_SIZE)
{// buffer full wait
pthread_cond_wait(&buffer->can_produce, &buffer->mutex);
}
//buffer filled up with the read data from the file.
if (conditionValid)
{
//add item to buffer
++buffer->len;
}
pthread_cond_signal(&buffer->can_consume);
pthread_mutex_unlock(&buffer->mutex);
}
fclose(inFileReader);
buffer->isDone = true;
pthread_exit(NULL);
return NULL;
}
这是生产者,从给定的文件中读取数据并填充缓冲区,当文件结束时,生产者线程将退出。
void* consumer(void *param)
{
buffer_t* buffer_array = (buffer_t*)param;
sleep(rand() % 3);
int i =0;
while (!buffer_array[i%N].isDone || buffer_array[i%N].len != 0)
{
pthread_mutex_lock(&buffer_array[i%N].mutex);
if (buffer_array[i%N].len == 0)
{
pthread_cond_wait(&buffer_array[i%N].can_consume,
&buffer_array[i%N].mutex);
}
while (buffer_array[i%N].len != 0)
{
buffer_array[i%N].len--;
//read from buffer
}
pthread_cond_signal(&buffer_array[i%N].can_produce);
pthread_mutex_unlock(&buffer_array[i%N].mutex);
i++;
}
fclose(outFileWriter);
pthread_exit(NULL);
return NULL;
}
消费者从每个生产者读取缓冲区,当所有完成后消费者退出循环并终止线程。
感谢任何帮助。
死锁是因为生产者只设置了一个标志就完成了读取:
buffer->isDone = true;
因此,如果此时消费者正在等待生产者 1 条件:
// say i == 1
if (buffer_array[i%N].len == 0)
{
pthread_cond_wait(&buffer_array[i%N].can_consume,
&buffer_array[i%N].mutex);
}
条件永远不会触发。消费者没有进步,所以一切都只是停顿。
解决方案可能是在设置 isDone 后触发 can_consume
。
我正在使用 ubuntu 16.04 中的 pthreads 在具有 2 个处理器的虚拟机上实现消费者-生产者问题。我们有 n 个生产者和一个消费者,它通过缓冲区从每个生产者读取数据。每个生产者都有自己的缓冲区。问题是在某些时候我的代码死锁并且无法找出原因。可能是我弄乱了条件变量和互斥锁。
typedef struct
{
Student* buf; // the buffer
int producer_id;
size_t len; // number of items in the buffer
pthread_mutex_t mutex; // needed to add/remove data from the buffer
pthread_cond_t can_produce; // signaled when items are removed
pthread_cond_t can_consume; // signaled when items are added
bool isDone;
} buffer_t;
这是消费者和生产者之间数据传输的缓冲区结构。
void* producer(void *param)
{
buffer_t* buffer = (buffer_t*)param;
char* line = NULL;
size_t len = 0;
ssize_t read;
FILE* inFileReader;
inFileReader = fopen(inFile, "r");
while ((read = getline(&line, &len, inFileReader)) != -1)
{
pthread_mutex_lock(&buffer->mutex);
if (buffer->len == BUFFER_SIZE)
{// buffer full wait
pthread_cond_wait(&buffer->can_produce, &buffer->mutex);
}
//buffer filled up with the read data from the file.
if (conditionValid)
{
//add item to buffer
++buffer->len;
}
pthread_cond_signal(&buffer->can_consume);
pthread_mutex_unlock(&buffer->mutex);
}
fclose(inFileReader);
buffer->isDone = true;
pthread_exit(NULL);
return NULL;
}
这是生产者,从给定的文件中读取数据并填充缓冲区,当文件结束时,生产者线程将退出。
void* consumer(void *param)
{
buffer_t* buffer_array = (buffer_t*)param;
sleep(rand() % 3);
int i =0;
while (!buffer_array[i%N].isDone || buffer_array[i%N].len != 0)
{
pthread_mutex_lock(&buffer_array[i%N].mutex);
if (buffer_array[i%N].len == 0)
{
pthread_cond_wait(&buffer_array[i%N].can_consume,
&buffer_array[i%N].mutex);
}
while (buffer_array[i%N].len != 0)
{
buffer_array[i%N].len--;
//read from buffer
}
pthread_cond_signal(&buffer_array[i%N].can_produce);
pthread_mutex_unlock(&buffer_array[i%N].mutex);
i++;
}
fclose(outFileWriter);
pthread_exit(NULL);
return NULL;
}
消费者从每个生产者读取缓冲区,当所有完成后消费者退出循环并终止线程。
感谢任何帮助。
死锁是因为生产者只设置了一个标志就完成了读取:
buffer->isDone = true;
因此,如果此时消费者正在等待生产者 1 条件:
// say i == 1
if (buffer_array[i%N].len == 0)
{
pthread_cond_wait(&buffer_array[i%N].can_consume,
&buffer_array[i%N].mutex);
}
条件永远不会触发。消费者没有进步,所以一切都只是停顿。
解决方案可能是在设置 isDone 后触发 can_consume
。