两端等待的消费者/生产者
Consumer / Producer with wait on both ends
我使用 mutex
和 condition
编写了一个 producer/consumer 程序。
它使用全局 int
来生产和消费价值。
有 1 个消费者线程和多个生产者线程。
规则:
当值太小,消费者会等待
当值太大时,生产者会等待
我的问题是:
我们知道消费者通常需要等待,但生产者要视情况而定。
在我的示例中,他们都需要检查条件,并且可能会互相等待,这是一个好的做法吗?
会不会在我下面的实现中造成死锁?
代码:
// condition test, a producer/consumer program,
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
static int glob = 0; // global variable, shared by threads,
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
/**
* increase once, with lock, it's producer,
*
* @param arg
* {max, point}
* where:
* max, max value that would increase to,
* point, is min value that would trigger consume,
*
* @return
* 0, not changed,
* >0, increased,
* <0, error,
*/
static void *inc(void *arg) {
int *args = (int *)arg;
int max = args[0];
int point = args[1];
int result;
int n = 0;
if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
printf("error to get lock: %d\n", result);
pthread_exit(NULL); // terminate if error,
} else {
while(glob >= max) {
if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
printf("failed to wait for condition: %d\n", result);
return (void *)-1;
}
}
// do jobs,
glob++; // this will be compiled into multiple lines in machine code, so it's not automic,
n = 1;
/*
printf("inc by 1, %d\n", glob);
fflush(stdout);
*/
if(glob >= point) { // condition signal
if((result = pthread_cond_signal(&cond)) !=0 ) {
printf("error to condition signal: %d\n", result);
return (void *)-1;
} else {
// printf("condition signal, from thread [%d], value: %d\n", (int)pthread_self(), glob);
}
}
if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
printf("error to unlock: %d\n", result);
return (void *)-1;
}
}
return (void *)n;
}
// increase loop,
static void *inc_loop(void *arg) {
int result;
while(1) {
if((result = (int)inc(arg)) < 0) {
return (void *)result;
}
}
}
/**
* consumer, with lock,
*
* @param arg
* {point, steps}
* where:
* point, is min value that would trigger consume,
* steps, is the count each consume would take,
*
* @return
* 0, not consumed,
* >0, consumed,
* <0, error,
*/
static void *consume(void *arg) {
int *args = (int *)arg;
int point = args[0];
int step = args[1];
int result;
int n = 0;
if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
printf("error to get lock: %d\n", result);
pthread_exit(NULL); // terminate if error,
} else {
while(glob < point) {
pthread_cond_broadcast(&cond); // broadcast
printf("broadcast, and sleep,\n");
if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
printf("failed to wait for condition: %d\n", result);
return (void *)-1;
}
}
// do job
printf("going to perform consume: %d -> ", glob);
glob-=(glob>=step?step:glob);
printf("%d\n", glob);
n = 1;
if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
printf("error to unlock: %d\n", result);
}
}
return (void *)n;
}
// condition test
int condition_test(void *(*func_inc_loop) (void *), void *(*func_consume) (void *), int thread_count, int max, int point, int consume_count, int step) {
pthread_t threads[thread_count];
int result, i;
int inc_args[] = {
max, // max value that would increase to,
point // min value that would trigger consume,
};
// start threads
for(i=0; i<thread_count; i++) {
if((result = pthread_create(threads+i, NULL, func_inc_loop, inc_args)) != 0) {
printf("error create thread [%d]: %d\n", i, result);
}
}
int loops = 0;
int consume_args[] = {
point, // min point to trigger consume,
step // consume steps
};
// begin consume loop,
while(loops < consume_count) {
if(func_consume(consume_args) > 0) {
loops++;
}
}
printf("\nDone.\n");
return 0;
}
/**
* command line:
* ./a.out <[thread_count]> <[max]> <[point]> <[consume_count]>
*/
int main(int argc, char *argv[]) {
int thread_count = 3;
int max = 1000;
int point = 100;
int consume_count = 10; // how many times consume execute,
int step = 200; // max count in each consume,
if(argc >= 2) {
thread_count = atoi(argv[1]);
}
if(argc >= 3) {
max = atoi(argv[2]);
}
if(argc >= 4) {
point = atoi(argv[3]);
}
if(argc >= 5) {
consume_count = atoi(argv[4]);
}
if(argc >= 6) {
step = atoi(argv[5]);
}
condition_test(&inc_loop, &consume, thread_count, max, point, consume_count, step);
return 0;
}
编译:
gcc -pthread xxx.c
执行:
./a.out
实际上,您不应该使用互斥锁来解决 producer/consumer 或 reader 编写器问题。它不一定会引起僵局,但可能导致生产者或消费者挨饿。
我使用了类似的方法编写 reader/writer 锁。
你可以看看:
https://github.com/prathammalik/OS161/blob/master/kern/thread/synch.c
我使用 mutex
和 condition
编写了一个 producer/consumer 程序。
它使用全局 int
来生产和消费价值。
有 1 个消费者线程和多个生产者线程。
规则:
当值太小,消费者会等待
当值太大时,生产者会等待
我的问题是:
我们知道消费者通常需要等待,但生产者要视情况而定。
在我的示例中,他们都需要检查条件,并且可能会互相等待,这是一个好的做法吗?
会不会在我下面的实现中造成死锁?
代码:
// condition test, a producer/consumer program,
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
static int glob = 0; // global variable, shared by threads,
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
/**
* increase once, with lock, it's producer,
*
* @param arg
* {max, point}
* where:
* max, max value that would increase to,
* point, is min value that would trigger consume,
*
* @return
* 0, not changed,
* >0, increased,
* <0, error,
*/
static void *inc(void *arg) {
int *args = (int *)arg;
int max = args[0];
int point = args[1];
int result;
int n = 0;
if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
printf("error to get lock: %d\n", result);
pthread_exit(NULL); // terminate if error,
} else {
while(glob >= max) {
if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
printf("failed to wait for condition: %d\n", result);
return (void *)-1;
}
}
// do jobs,
glob++; // this will be compiled into multiple lines in machine code, so it's not automic,
n = 1;
/*
printf("inc by 1, %d\n", glob);
fflush(stdout);
*/
if(glob >= point) { // condition signal
if((result = pthread_cond_signal(&cond)) !=0 ) {
printf("error to condition signal: %d\n", result);
return (void *)-1;
} else {
// printf("condition signal, from thread [%d], value: %d\n", (int)pthread_self(), glob);
}
}
if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
printf("error to unlock: %d\n", result);
return (void *)-1;
}
}
return (void *)n;
}
// increase loop,
static void *inc_loop(void *arg) {
int result;
while(1) {
if((result = (int)inc(arg)) < 0) {
return (void *)result;
}
}
}
/**
* consumer, with lock,
*
* @param arg
* {point, steps}
* where:
* point, is min value that would trigger consume,
* steps, is the count each consume would take,
*
* @return
* 0, not consumed,
* >0, consumed,
* <0, error,
*/
static void *consume(void *arg) {
int *args = (int *)arg;
int point = args[0];
int step = args[1];
int result;
int n = 0;
if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
printf("error to get lock: %d\n", result);
pthread_exit(NULL); // terminate if error,
} else {
while(glob < point) {
pthread_cond_broadcast(&cond); // broadcast
printf("broadcast, and sleep,\n");
if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
printf("failed to wait for condition: %d\n", result);
return (void *)-1;
}
}
// do job
printf("going to perform consume: %d -> ", glob);
glob-=(glob>=step?step:glob);
printf("%d\n", glob);
n = 1;
if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
printf("error to unlock: %d\n", result);
}
}
return (void *)n;
}
// condition test
int condition_test(void *(*func_inc_loop) (void *), void *(*func_consume) (void *), int thread_count, int max, int point, int consume_count, int step) {
pthread_t threads[thread_count];
int result, i;
int inc_args[] = {
max, // max value that would increase to,
point // min value that would trigger consume,
};
// start threads
for(i=0; i<thread_count; i++) {
if((result = pthread_create(threads+i, NULL, func_inc_loop, inc_args)) != 0) {
printf("error create thread [%d]: %d\n", i, result);
}
}
int loops = 0;
int consume_args[] = {
point, // min point to trigger consume,
step // consume steps
};
// begin consume loop,
while(loops < consume_count) {
if(func_consume(consume_args) > 0) {
loops++;
}
}
printf("\nDone.\n");
return 0;
}
/**
* command line:
* ./a.out <[thread_count]> <[max]> <[point]> <[consume_count]>
*/
int main(int argc, char *argv[]) {
int thread_count = 3;
int max = 1000;
int point = 100;
int consume_count = 10; // how many times consume execute,
int step = 200; // max count in each consume,
if(argc >= 2) {
thread_count = atoi(argv[1]);
}
if(argc >= 3) {
max = atoi(argv[2]);
}
if(argc >= 4) {
point = atoi(argv[3]);
}
if(argc >= 5) {
consume_count = atoi(argv[4]);
}
if(argc >= 6) {
step = atoi(argv[5]);
}
condition_test(&inc_loop, &consume, thread_count, max, point, consume_count, step);
return 0;
}
编译:
gcc -pthread xxx.c
执行:
./a.out
实际上,您不应该使用互斥锁来解决 producer/consumer 或 reader 编写器问题。它不一定会引起僵局,但可能导致生产者或消费者挨饿。
我使用了类似的方法编写 reader/writer 锁。
你可以看看: https://github.com/prathammalik/OS161/blob/master/kern/thread/synch.c