是否可以使用条件变量和信号而不是广播来创建生产者消费者问题?

Is it possible to create the producer consumer problem with condition variables and signal instead of broadcast?

我正在尝试 运行 使用 pthread_cond_signal() 而不是 pthread_cond_broadcast() 的生产者-消费者问题,但是,我尝试了很多事情,但似乎无法做到(如果我选择 n 生产者和 one 消费者,那么消费者完成但并非所有生产者都完成,有些人卡在完整队列中所以问题永远不会完成执行。我从某人那里得到了 initial code else GitHub 并且我正在尝试对其进行编辑以实现该目的(您可以查看附件 link 上的代码,我会将其粘贴在 post 的末尾)。相关部分这是生产者和消费者功能,目前我有以下内容:

消费者:

void *consumer (void *carg)
{
  queue  *fifo;
  int     item_consumed;
  pcdata *mydata;
  int     my_tid;
  int    *total_consumed;

  mydata = (pcdata *) carg;

  fifo           = mydata->q;
  total_consumed = mydata->count;
  my_tid         = mydata->tid;

  while (1) {
     pthread_mutex_lock(fifo->mutex); //start of the critical section
 
    while (fifo->empty && *total_consumed != WORK_MAX) {
      printf ("con %d:   EMPTY.\n", my_tid);
      pthread_cond_wait(fifo->notEmpty, fifo->mutex); //if queue is empty then wait for signal that it has something to start consuming
    }

 
    if (*total_consumed >= WORK_MAX) {
       pthread_mutex_unlock(fifo->mutex); //if max work is reached then unlock the mutex and exit
      break;
    }
    queueRemove (fifo, &item_consumed); //reaching this means that queue isn\t empty so just consume
    (*total_consumed)++;
    do_work(CONSUMER_CPU,CONSUMER_CPU);

    printf ("con %d:   %d.\n", my_tid, item_consumed);

    pthread_cond_signal(fifo->notFull); //consumption is done so queue isn't full, signal to producer
    pthread_mutex_unlock(fifo->mutex);
  }

  printf("con %d:   exited\n", my_tid);
  return (NULL);
}

制作人:

void *producer (void *parg)
{
  queue  *fifo;
  int     item_produced;
  pcdata *mydata;
  int     my_tid;
  int    *total_produced;

  mydata = (pcdata *) parg;

  fifo           = mydata->q;
  total_produced = mydata->count;
  my_tid         = mydata->tid;

  while (1) {
      
    pthread_mutex_lock(fifo->mutex);
    do_work(PRODUCER_CPU, PRODUCER_BLOCK);
 
  
    while (fifo->full && *total_produced != WORK_MAX) {
      printf ("prod %d:  FULL.\n", my_tid);
      pthread_cond_wait(fifo->notFull, fifo->mutex); //if queue is full then wait for signal that is not anyone to start producing
  
    }

    if (*total_produced >= WORK_MAX) {
      pthread_mutex_unlock(fifo->mutex); //if total work reached then exit
      break;
    }

    item_produced = (*total_produced)++;
    queueAdd (fifo, item_produced);


    printf("prod %d:  %d.\n", my_tid, item_produced);
    pthread_cond_signal(fifo->notEmpty); //reaching this means we produced something so signal that queue is not empty
    pthread_mutex_unlock(fifo->mutex);
  }

  printf("prod %d:  exited\n", my_tid);
  return (NULL);
}

我的问题是并非所有线程都收到退出信号,例如 运行最多 13 个生产者和一个消费者工作,但是超过 13 个生产者和一个消费者的所有线程都卡住了,这是一个执行运行宁 14 个生产者与一个消费者的样本:

con 0:   EMPTY.
prod 3:  2.
prod 9:  FULL.
prod 11:  FULL.
prod 13:  FULL.
prod 0:  0.
prod 8:  3.
prod 6:  1.
prod 12:  FULL.
prod 7:  4.
prod 10:  FULL.
prod 1:  FULL.
prod 2:  FULL.
prod 5:  FULL.
prod 4:  FULL.
prod 9:  5.
prod 3:  FULL.
prod 0:  FULL.
prod 6:  FULL.
prod 8:  FULL.
prod 7:  FULL.
prod 9:  FULL.
con 0:   0.
prod 11:  6.
prod 13:  FULL.
prod 11:  FULL.
con 0:   1.
prod 12:  7.
prod 10:  FULL.
prod 12:  FULL.
con 0:   2.
prod 1:  8.
prod 2:  FULL.
prod 1:  FULL.
con 0:   3.
prod 4:  9.
prod 5:  FULL.
prod 4:  FULL.
con 0:   4.
prod 3:  10.
prod 0:  FULL.
prod 3:  FULL.
con 0:   5.
prod 6:  11.
prod 8:  FULL.
prod 6:  FULL.
con 0:   6.
prod 7:  12.
prod 9:  FULL.
prod 7:  FULL.
con 0:   7.
prod 13:  13.
prod 11:  FULL.
prod 13:  FULL.
con 0:   8.
prod 12:  14.
prod 10:  FULL.
prod 12:  FULL.
con 0:   9.
prod 2:  15.
prod 1:  FULL.
prod 2:  FULL.
con 0:   10.
prod 5:  16.
prod 4:  FULL.
prod 5:  FULL.
con 0:   11.
prod 3:  17.
prod 0:  FULL.
prod 3:  FULL.
con 0:   12.
prod 8:  18.
prod 6:  FULL.
prod 8:  FULL.
con 0:   13.
prod 9:  19.
prod 7:  FULL.
prod 9:  FULL.
con 0:   14.
prod 11:  20.
prod 13:  FULL.
prod 11:  FULL.
con 0:   15.
prod 10:  21.
prod 12:  FULL.
prod 10:  FULL.
con 0:   16.
prod 2:  22.
prod 1:  FULL.
prod 2:  FULL.
con 0:   17.
prod 4:  23.
prod 5:  FULL.
prod 4:  FULL.
con 0:   18.
prod 0:  24.
prod 3:  FULL.
prod 0:  FULL.
con 0:   19.
prod 6:  25.
prod 8:  FULL.
prod 6:  FULL.
con 0:   20.
prod 7:  26.
prod 9:  FULL.
prod 7:  FULL.
con 0:   21.
prod 11:  27.
prod 13:  FULL.
prod 11:  FULL.
con 0:   22.
prod 12:  28.
prod 10:  FULL.
prod 12:  FULL.
con 0:   23.
prod 1:  29.
prod 2:  exited
prod 1:  exited
con 0:   24.
prod 4:  exited
prod 5:  exited
con 0:   25.
prod 3:  exited
prod 0:  exited
con 0:   26.
prod 8:  exited
prod 6:  exited
con 0:   27.
prod 9:  exited
prod 7:  exited
con 0:   28.
prod 13:  exited
prod 11:  exited
con 0:   29.
con 0:   exited
prod 10:  exited

如有任何帮助,我们将不胜感激。

完整代码,如果有人感兴趣(它很长,但我想大部分相关部分已经在我 posted 的方法中):

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>

/*
 * Define constants for how big the shared queue should be and how
 * much total work the produceers and consumers should perform
 */

#define QUEUESIZE 5
#define WORK_MAX 30

/*
 * These constants specify how much CPU bound work the producer and
 * consumer do when processing an item. They also define how long each
 * blocks when producing an item. Work and blocking are implemented
 * int he do_work() routine that uses the msleep() routine to block
 * for at least the specified number of milliseconds.
 */
#define PRODUCER_CPU   25
#define PRODUCER_BLOCK 10
#define CONSUMER_CPU   25
#define CONSUMER_BLOCK 10

/*****************************************************
 *   Shared Queue Related Structures and Routines    *
 *****************************************************/
typedef struct {
  int buf[QUEUESIZE];   /* Array for Queue contents, managed as circular queue */
  int head;             /* Index of the queue head */
  int tail;             /* Index of the queue tail, the next empty slot */  

  int full;             /* Flag set when queue is full  */
  int empty;            /* Flag set when queue is empty */

  pthread_mutex_t *mutex;     /* Mutex protecting this Queue's data */
  pthread_cond_t  *notFull;   /* Used by producers to await room to produce*/
  pthread_cond_t  *notEmpty;  /* Used by consumers to await something to consume*/
} queue;

/*
 * Create the queue shared among all producers and consumers
 */
queue *queueInit (void)
{
  queue *q;

  /*
   * Allocate the structure that holds all queue information
   */
  q = (queue *)malloc (sizeof (queue));
  if (q == NULL) return (NULL);

  /*
   * Initialize the state variables. See the definition of the Queue
   * structure for the definition of each.
   */
  q->empty = 1;  
  q->full  = 0;   

  q->head  = 0;   
  q->tail  = 0;   

  /*
   * Allocate and initialize the queue mutex
   */
  q->mutex = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
  pthread_mutex_init (q->mutex, NULL);

  /*
   * Allocate and initialize the notFull and notEmpty condition
   * variables
   */
  q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
  pthread_cond_init (q->notFull, NULL);

  q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
  pthread_cond_init (q->notEmpty, NULL);

  return (q);
}

/*
 * Delete the shared queue, deallocating dynamically allocated memory
 */
void queueDelete (queue *q)
{
  /*
   * Destroy the mutex and deallocate its memory
   */
  pthread_mutex_destroy (q->mutex);
  free (q->mutex);
  
  /*
   * Destroy and deallocate the condition variables
   */
  pthread_cond_destroy (q->notFull);
  free (q->notFull);

  pthread_cond_destroy (q->notEmpty);
  free (q->notEmpty);

  /*
   * Deallocate the queue structure
   */
  free (q);
}

void queueAdd (queue *q, int in)
{
  
  /*
   * Put the input item into the free slot
   */
  q->buf[q->tail] = in;
  q->tail++;

  /*
   * wrap the value of tail around to zero if we reached the end of
   * the array. This implements the circularity of the queue inthe
   * array.
   */
  if (q->tail == QUEUESIZE)
    q->tail = 0;

  /*
   * If the tail pointer is equal to the head, then the enxt empty
   * slot in the queue is occupied and the queue is FULL
   */
  if (q->tail == q->head)
    q->full = 1;

  /*
   * Since we just added an element to the queue, it is certainly not
   * empty.
   */
  q->empty = 0;

  return;
}

void queueRemove (queue *q, int *out)
{
  /*
   * Copy the element at head into the output variable and increment
   * the head pointer to move to the next element.
   */
  *out = q->buf[q->head];
  q->head++;

  /*
   * Wrap the index around to zero if it reached the size of the
   * array. This implements the circualrity of the queue int he array.
   */
  if (q->head == QUEUESIZE)
    q->head = 0;

  /*
   * If head catches up to tail as we delete an item, then the queue
   * is empty.
   */
  if (q->head == q->tail)
    q->empty = 1;

  /*
   * since we took an item out, the queue is certainly not full
   */
  q->full = 0;

  return;
}

/******************************************************
 *   Producer and Consumer Structures and Routines    *
 ******************************************************/
/*
 * Argument struct used to pass consumers and producers thier
 * arguments.  
 * 
 * q     - arg provides a pointer to the shared queue. 
 *
 * count - arg is a pointer to a counter for this thread to track how
 *         much work it did.
 *
 * tid   - arg provides the ID number of the producer or consumer, 
 *         whichis also its index into the array of thread structures.
 * 
 */
typedef struct {
  queue *q;       
  int   *count;   
  int    tid;
} pcdata;

int memory_access_area[100000];


/*
 * Sleep for a specified number of milliseconds. We use this to
 * simulate I/O, since it will block the process. Different lengths fo
 * sleep simulate interaction with different devices.
 */
void msleep(unsigned int milli_seconds)
{
  struct timespec req = {0}; /* init struct contents to zero */
  time_t          seconds;

  /*
   * Convert number of milliseconds input to seconds and residual
   * milliseconds to handle the cse where input is more than one
   * thousand milliseconds.
   */
  seconds        = (milli_seconds/1000);
  milli_seconds  = milli_seconds - (seconds * 1000);

  /*
   * Fill in the time_spec's seconds and nanoseconds fields. Note we
   * multiply millisconds by 10^6 to convert to nanoseconds.
   */
  req.tv_sec  = seconds;
  req.tv_nsec = milli_seconds * 1000000L;

  /*
   * Sleep for the required period. The first parameter specifies how
   * long. In theory this thread can be awakened before the period is
   * over, perhaps by a signal. If so the timespec specified by the
   * second argument is filled in with how much time int he original
   * request is left. We use the same one. If this happens, we just
   * call nanosleep again to sleep for what remains of the origianl
   * request.
   */
  while(nanosleep(&req, &req)==-1) {
    printf("restless\n");
    continue;
  }

}

/*
 * Simulate doing work. 
 */
void do_work(int cpu_iterations, int blocking_time)
{
  int i;
  int j;
  int local_var;

  local_var = 0;
  for (j = 0; j < cpu_iterations; j++ ) {
    for (i = 0; i < 1000; i++ ) {
      local_var = memory_access_area[i];
    }
  }
  
  if ( blocking_time > 0 ) {
    msleep(blocking_time);
  }
}

void *producer (void *parg)
{
  queue  *fifo;
  int     item_produced;
  pcdata *mydata;
  int     my_tid;
  int    *total_produced;

  mydata = (pcdata *) parg;

  fifo           = mydata->q;
  total_produced = mydata->count;
  my_tid         = mydata->tid;

  /*
   * Continue producing until the total produced reaches the
   * configured maximum
   */
  while (1) {
      
   
    pthread_mutex_lock(fifo->mutex);
    do_work(PRODUCER_CPU, PRODUCER_BLOCK);
 
    /*
     * If the queue is full, we have no place to put anything we
     * produce, so wait until it is not full.
     */
    while (fifo->full && *total_produced != WORK_MAX) {
      printf ("prod %d:  FULL.\n", my_tid);
      pthread_cond_wait(fifo->notFull, fifo->mutex);
  
    }

    /*
     * Check to see if the total produced by all producers has reached
     * the configured maximum, if so, we can quit.
     */
    if (*total_produced >= WORK_MAX) {
      pthread_mutex_unlock(fifo->mutex);
      break;
    }

    /*
     * OK, so we produce an item. Increment the counter of total
     * widgets produced, and add the new widget ID, its number, to the
     * queue.
     */
    item_produced = (*total_produced)++;
    queueAdd (fifo, item_produced);

    /*
     * Announce the production outside the critical section 
     */
    printf("prod %d:  %d.\n", my_tid, item_produced);
    pthread_cond_signal(fifo->notEmpty);
    pthread_mutex_unlock(fifo->mutex);
  }

  printf("prod %d:  exited\n", my_tid);
  return (NULL);
}

void *consumer (void *carg)
{
  queue  *fifo;
  int     item_consumed;
  pcdata *mydata;
  int     my_tid;
  int    *total_consumed;

  mydata = (pcdata *) carg;

  fifo           = mydata->q;
  total_consumed = mydata->count;
  my_tid         = mydata->tid;

  /*
   * Continue producing until the total consumed by all consumers
   * reaches the configured maximum
   */
  while (1) {
     pthread_mutex_lock(fifo->mutex);
    /*
     * If the queue is empty, there is nothing to do, so wait until it
     * si not empty.
     */
    while (fifo->empty && *total_consumed != WORK_MAX) {
      printf ("con %d:   EMPTY.\n", my_tid);
      pthread_cond_wait(fifo->notEmpty, fifo->mutex);
    }

    /*
     * If total consumption has reached the configured limit, we can
     * stop
     */
    if (*total_consumed >= WORK_MAX) {
       pthread_mutex_unlock(fifo->mutex);
      break;
    }

    /*
     * Remove the next item from the queue. Increment the count of the
     * total consumed. Note that item_consumed is a local copy so this
     * thread can retain a memory of which item it consumed even if
     * others are busy consuming them. 
     */
    queueRemove (fifo, &item_consumed);
    (*total_consumed)++;
    do_work(CONSUMER_CPU,CONSUMER_CPU);

    printf ("con %d:   %d.\n", my_tid, item_consumed);

    pthread_cond_signal(fifo->notFull);
    pthread_mutex_unlock(fifo->mutex);

    
    
    
  }

  printf("con %d:   exited\n", my_tid);
  return (NULL);
}

/***************************************************
 *   Main allocates structures, creates threads,   *
 *   waits to tear down.                           *
 ***************************************************/
int main (int argc, char *argv[])
{
  pthread_t *con;
  int        cons;
  int       *concount;

  queue     *fifo;
  int        i;

  pthread_t *pro;
  int       *procount;
  int        pros;

  pcdata    *thread_args;

  /*
   * Check the number of arguments and determine the numebr of
   * producers and consumers
   */
  if (argc != 3) {
    printf("Usage: producer_consumer number_of_producers number_of_consumers\n");
    exit(0);
  }

  pros = atoi(argv[1]);
  cons = atoi(argv[2]);

  /*
   * Create the shared queue
   */
  fifo = queueInit ();
  if (fifo ==  NULL) {
    fprintf (stderr, "main: Queue Init failed.\n");
    exit (1);
  }

  /*
   * Create a counter tracking how many items were produced, shared
   * among all producers, and one to track how many items were
   * consumed, shared among all consumers.
   */
  procount = (int *) malloc (sizeof (int));
  if (procount == NULL) { 
    fprintf(stderr, "procount allocation failed\n"); 
    exit(1); 
  }
  
  concount = (int *) malloc (sizeof (int));
  if (concount == NULL) { 
    fprintf(stderr, "concount allocation failed\n"); 
    exit(1); 
  }

  /*
   * Create arrays of thread structures, one for each producer and
   * consumer
   */
  pro = (pthread_t *) malloc (sizeof (pthread_t) * pros);
  if (pro == NULL) { 
    fprintf(stderr, "pros\n"); 
    exit(1); 
  }

  con = (pthread_t *) malloc (sizeof (pthread_t) * cons);
  if (con == NULL) { 
    fprintf(stderr, "cons\n"); 
    exit(1); 
  }

  /*
   * Create the specified number of producers
   */
  for (i=0; i<pros; i++){ 
    /*
     * Allocate memory for each producer's arguments
     */
    thread_args = (pcdata *)malloc (sizeof (pcdata));
    if (thread_args == NULL) {
      fprintf (stderr, "main: Thread_Args Init failed.\n");
      exit (1);
    }

    /*
     * Fill them in and then create the producer thread
     */
    thread_args->q     = fifo;
    thread_args->count = procount;
    thread_args->tid   = i;
    pthread_create (&pro[i], NULL, producer, thread_args);
  }

  /*
   * Create the specified number of consumers
   */
  for (i=0; i<cons; i++){
    /*
     * Allocate space for next consumer's args
     */
    thread_args = (pcdata *)malloc (sizeof (pcdata));
    if (thread_args == NULL) {
      fprintf (stderr, "main: Thread_Args Init failed.\n");
      exit (1);
    }

    /*
     * Fill them in and create the thread
     */
    thread_args->q     = fifo;
    thread_args->count = concount;
    thread_args->tid   = i;
    pthread_create (&con[i], NULL, consumer, thread_args);
  }

  /*
   * Wait for the create producer and consumer threads to finish
   * before this original thread will exit. We wait for all the
   * producers before waiting for the consumers, but that is an
   * unimportant detail.
   */
  for (i=0; i<pros; i++)
    pthread_join (pro[i], NULL);
  for (i=0; i<cons; i++)
    pthread_join (con[i], NULL);

  /*
   * Delete the shared fifo, now that we know there are no users of
   * it. Since we are about to exit we could skip this step, but we
   * put it here for neatness' sake.
   */
  queueDelete (fifo);

  return 0;
}

I am trying to run the producer-consumer problem while using pthread_cond_signal() instead of pthread_cond_broadcast(), however, I attempted a lot of things and can't seem to do it (if I choose n producers and one consumer then the consumer finishes but not all producers finish, some are stuck on full queue so the problem never finishes executing.

嗯,这听起来非常有道理。如果您有多个线程阻塞在一个 CV 上,那么一个信号将唤醒其中一个。其余的将保持阻塞状态。

我通常倾向于走另一条路。如果你正确地使用你的 CV,那么总是向它广播而不是向它发送信号是安全的,但相反的做法会暴露更多可能的错误区域,尤其是当涉及两个以上的线程时。

特别是对于关机场景,我建议只使用广播。您需要唤醒潜在的多个线程,而这正是 pthread_cond_broadcast() 的用途。如果您愿意,可以让主线程代替消费者或生产者执行此操作。但是,如果您坚持只使用 pthread_cond_signal(),那么您必须确保调用该函数的次数足够多,以唤醒所有可能在 CV 上被阻塞的线程。同样,这些调用中的一些或全部可以由主线程执行。

更新

尽管我提出了上述广播建议,但仅通过信号获得干净关闭的一种相对较好的方法是让每个生产者在终止之前发出 notFull CV 信号。你可以把它放在几个地方,但我自己可能会这样做:

    if (*total_produced >= WORK_MAX) {
      pthread_mutex_unlock(fifo->mutex); //if total work reached then exit
      pthread_cond_signal(fifo->notFull); // other producers should wake up and exit, too
      break;
    }

请注意,互斥锁当前不需要被发送广播或信号的线程锁定。

另请注意,如果您走这条路,那么消费者希望得到类似的待遇。

最后,请注意,对于这段特定的代码,从功能的角度来看,您尝试执行的转换不是一个好主意,尤其是对于生产者和消费者数量不同的情况。无论一开始有多少,它都会趋向于减少到拥有相同数量的活跃​​生产者和消费者,并且可能在更长的时间范围内,在任何给定时间最多拥有一个。这些后果来自多个消费者或多个生产者同时在 CV 上被阻止的情况。