两端等待的消费者/生产者

Consumer / Producer with wait on both ends

我使用 mutexcondition 编写了一个 producer/consumer 程序。 它使用全局 int 来生产和消费价值。 有 1 个消费者线程和多个生产者线程。

规则:

  1. 当值太小,消费者会等待

  2. 当值太大时,生产者会等待

我的问题是:

我们知道消费者通常需要等待,但生产者要视情况而定。
在我的示例中,他们都需要检查条件,并且可能会互相等待,这是一个好的做法吗?
会不会在我下面的实现中造成死锁?

代码:

// condition test, a producer/consumer program,

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

static int glob = 0; // global variable, shared by threads,
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

/**
 * increase once, with lock, it's producer,
 * 
 * @param arg
 *  {max, point}
 *  where:
 *      max, max value that would increase to,
 *      point, is min value that would trigger consume,
 *  
 * @return
 *  0, not changed,
 *  >0, increased,
 *  <0, error,
 */
static void *inc(void *arg) {
    int *args = (int *)arg;
    int max = args[0];
    int point = args[1];

    int result;
    int n = 0;

    if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
        printf("error to get lock: %d\n", result);
        pthread_exit(NULL); // terminate if error,
    } else {
        while(glob >= max) {
            if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
                printf("failed to wait for condition: %d\n", result);
                return (void *)-1;
            }
        }

        // do jobs,
        glob++; // this will be compiled into multiple lines in machine code, so it's not automic,
        n = 1;
        /*
        printf("inc by 1, %d\n", glob);
        fflush(stdout);
        */

        if(glob >= point) { // condition signal
            if((result = pthread_cond_signal(&cond)) !=0 ) {
                printf("error to condition signal: %d\n", result);
                return (void *)-1;
            } else {
                // printf("condition signal, from thread [%d], value: %d\n", (int)pthread_self(),  glob);
            }
        }

        if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
            printf("error to unlock: %d\n", result);
            return (void *)-1;
        }
    }

    return (void *)n;
}

// increase loop,
static void *inc_loop(void *arg) {
    int result;
    while(1) {
        if((result = (int)inc(arg)) < 0) {
            return (void *)result;
        }
    }
}

/**
 * consumer, with lock,
 * 
 * @param arg
 *  {point, steps}
 *  where:
 *      point, is min value that would trigger consume,
 *      steps, is the count each consume would take,
 *  
 * @return
 *  0, not consumed,
 *  >0, consumed,
 *  <0, error,
 */
static void *consume(void *arg) {
    int *args = (int *)arg;
    int point = args[0];
    int step = args[1];
    int result;
    int n = 0;

    if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
        printf("error to get lock: %d\n", result);
        pthread_exit(NULL); // terminate if error,
    } else {
        while(glob < point) {
            pthread_cond_broadcast(&cond); // broadcast
            printf("broadcast, and sleep,\n");

            if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
                printf("failed to wait for condition: %d\n", result);
                return (void *)-1;
            }
        }

        // do job
        printf("going to perform consume: %d -> ", glob);
        glob-=(glob>=step?step:glob);
        printf("%d\n", glob);
        n = 1;

        if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
            printf("error to unlock: %d\n", result);
        }
    }

    return (void *)n;
}

// condition test
int condition_test(void *(*func_inc_loop) (void *), void *(*func_consume) (void *), int thread_count, int max, int point, int consume_count, int step) {
    pthread_t threads[thread_count];

    int result, i;
    int inc_args[] = {
        max, // max value that would increase to,
        point // min value that would trigger consume,
    };

    // start threads
    for(i=0; i<thread_count; i++) {
        if((result = pthread_create(threads+i, NULL, func_inc_loop, inc_args)) != 0) {
            printf("error create thread [%d]: %d\n", i, result);
        }
    }

    int loops = 0;
    int consume_args[] = {
        point, // min point to trigger consume,
        step // consume steps
    };

    // begin consume loop,
    while(loops < consume_count) {
        if(func_consume(consume_args) > 0) {
            loops++;
        }
    }

    printf("\nDone.\n");

    return 0;
}

/**
 * command line:
 *  ./a.out <[thread_count]> <[max]> <[point]> <[consume_count]>
 */
int main(int argc, char *argv[]) {
    int thread_count = 3;
    int max = 1000;
    int point = 100;
    int consume_count = 10; // how many times consume execute,
    int step = 200; // max count in each consume,

    if(argc >= 2) {
        thread_count = atoi(argv[1]);
    }
    if(argc >= 3) {
        max = atoi(argv[2]);
    }
    if(argc >= 4) {
        point = atoi(argv[3]);
    }
    if(argc >= 5) {
        consume_count = atoi(argv[4]);
    }
    if(argc >= 6) {
        step = atoi(argv[5]);
    }

    condition_test(&inc_loop, &consume, thread_count, max, point, consume_count, step);

    return 0;
}

编译:

gcc -pthread xxx.c

执行:

./a.out

实际上,您不应该使用互斥锁来解决 producer/consumer 或 reader 编写器问题。它不一定会引起僵局,但可能导致生产者或消费者挨饿。

我使用了类似的方法编写 reader/writer 锁。

你可以看看: https://github.com/prathammalik/OS161/blob/master/kern/thread/synch.c