多线程、消费者和生产者

Multi-threading, consumers and producers

我对多线程有疑问,因为我是这个主题的新手。下面的代码是我从大学获得的代码。它有几个版本,我能理解其中的大部分。但我并不真正理解 nready.nready 变量和所有这些线程条件。谁能描述一下这两个在这里是如何工作的?为什么我不能通过互斥锁同步线程的工作?

#include    "unpipc.h"

#define MAXNITEMS       1000000
#define MAXNTHREADS         100

    /* globals shared by threads */
int     nitems;             /* read-only by producer and consumer */
int     buff[MAXNITEMS];

struct {
pthread_mutex_t mutex;
pthread_cond_t  cond;
int             nput;
int             nval;
int             nready;
} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };

void    *produce(void *), *consume(void *);

/* include main */
int
main(int argc, char **argv)
{
int         i, nthreads, count[MAXNTHREADS];
pthread_t   tid_produce[MAXNTHREADS], tid_consume;

if (argc != 3)
    err_quit("usage: prodcons5 <#items> <#threads>");
nitems = min(atoi(argv[1]), MAXNITEMS);
nthreads = min(atoi(argv[2]), MAXNTHREADS);

Set_concurrency(nthreads + 1);
    /* 4create all producers and one consumer */
for (i = 0; i < nthreads; i++) {
    count[i] = 0;
    Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
Pthread_create(&tid_consume, NULL, consume, NULL);

    /* wait for all producers and the consumer */
for (i = 0; i < nthreads; i++) {
    Pthread_join(tid_produce[i], NULL);
    printf("count[%d] = %d\n", i, count[i]);    
}
Pthread_join(tid_consume, NULL);

exit(0);
}
/* end main */

void *
produce(void *arg)
{
    for ( ; ; ) {
    Pthread_mutex_lock(&nready.mutex);
    if (nready.nput >= nitems) {
        Pthread_mutex_unlock(&nready.mutex);
        return(NULL);       /* array is full, we're done */
    }
    buff[nready.nput] = nready.nval;
    nready.nput++;
    nready.nval++;
    nready.nready++;
    Pthread_cond_signal(&nready.cond);
    Pthread_mutex_unlock(&nready.mutex);
    *((int *) arg) += 1;
}
}

/* include consume */
void *
consume(void *arg)
{
int     i;

for (i = 0; i < nitems; i++) {
    Pthread_mutex_lock(&nready.mutex);
    while (nready.nready == 0)
        Pthread_cond_wait(&nready.cond, &nready.mutex);
    nready.nready--;
    Pthread_mutex_unlock(&nready.mutex);

    if (buff[i] != i)
        printf("buff[%d] = %d\n", i, buff[i]);
}
return(NULL);
}
/* end consume */
"But I don't really understand the nready.nready variable"

this results from the struct instance being named 'nready' and there 
is a field within the struct named 'nready'

IMO: a very poor design to have two different objects being given the same name

the nready field of the nready struct seems to be keeping track of the number of 
items that have been 'produced'
pthread_mutex_lock(&nready.mutex);
while (nready.nready == 0)
    pthread_cond_wait(&nready.cond, &nready.mutex);
nready.nready--;
pthread_mutex_unlock(&nready.mutex);

这个结构的重点是保证当你执行相应的动作(nready.nready--)时条件(nready.nready == 0)仍然为真,或者 - 如果条件不满意 - 等到它没有使用 CPU 时间。

您只能使用互斥锁来检查条件是否正确并以原子方式执行相应的操作。但是如果条件不满足,你就不知道该怎么办了。等待?到什么时候?再检查一遍?释放互斥量并在之后立即重新检查?那会浪费 CPU 时间...

pthread_cond_signal()和pthread_cond_wait()就是为了解决这个问题。您应该查看他们的手册页。

简而言之,pthread_cond_wait 的作用是让调用线程休眠并以原子方式释放互斥锁,直到它收到信号为止。所以这是一个阻塞函数。然后可以通过从不同的线程调用信号或广播来重新安排线程。当线程收到信号时,它会再次获取互斥量并退出 wait() 函数。

此时你知道

  1. 你的条件是正确的
  2. 你持有互斥量。

因此,您可以对数据做任何需要做的事情。

不过要小心,如果您不确定另一个线程是否会发出信号,则不应调用 wait。这是一个非常常见的死锁来源。

当线程收到信号时,它会被放入准备好被调度的线程列表中。到线程实际执行时,您的条件(即 nread.nready == 0)可能再次为假。因此 while (重新检查线程是否被唤醒)。

1) struct nreadynready字段用于记录有多少任务准备消费,即数组buff中剩余的任务。 nready.nready++; 语句仅在生产者向数组 buff 中添加一项时执行,而 nready.nready--; 仅在消费者从 buff 中获取一项时执行。通过 is 变量,程序员可以随时跟踪还有多少任务需要处理。

2)

pthread_mutex_lock(&nready.mutex);
while (nready.nready == 0)
    pthread_cond_wait(&nready.cond, &nready.mutex);
nready.nready--;
pthread_mutex_unlock(&nready.mutex);

以上语句是常见的条件变量用法。你可以检查 POSIX Threads Programming and Condition Variables 有关条件变量的更多信息。

为什么不能只用mutex?您可以一次又一次地轮询互斥锁。显然,它 CPU 耗时并且可能会极大地影响系统性能。相反,您希望消费者在 buff 中没有更多项目时进入睡眠状态,并在生产者将新项目放入 buff 时唤醒。条件变量在这里就充当了这个角色。当没有项目(nready.nready==0)时,pthread_cond_wait()函数让当前线程进入休眠状态,节省宝贵的cpu时间。当新商品到达时,Pthread_cond_signal() 唤醒消费者。