多生产者消费者执行的效率

Efficiency of the multi producer-consumer execution

我正在尝试制作一个包含多个生产者和消费者的代码。我为生产者和消费者创建了多线程,并使用信号量进行同步。该代码在单个生产者和消费者中运行良好。

我面临的问题是,在程序执行一段时间后,只有 consumer1 和 producer1 参与了这个过程。我无法理解其他生产者和消费者发生了什么。

我也想知道如何使多生产者-消费者问题变得高效?在所有生产者和消费者都有平等的机会分别生产和消费的意义上有效率吗? C++代码(包含大量C):

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <queue>
using namespace std;
sem_t empty;
sem_t full;
int cnt = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
queue<int> q;
void *producer(void *a)
{   
    int *num = (int *)a;
    while(1) {
        sem_wait(&empty);
        pthread_mutex_lock(&mutex);
        cnt = cnt+1;
        q.push(cnt);
        cout<<cnt<<" item produced by producer "<<(*num+1)<<endl;
        pthread_mutex_unlock(&mutex);
        sem_post(&full);
        sleep(1);
    }
}
void *consumer(void *a)
{   
    int *num = (int *)a;
    while(1) {
        sem_wait(&full);
        pthread_mutex_lock(&mutex);
        cout<<q.front()<<" item consumed by consumer "<<(*num+1)<<endl;
        q.pop();
        pthread_mutex_unlock(&mutex);
        sem_post(&empty);
        sleep(1);
    }
}
int main()
{   
    pthread_t p[5];
    pthread_t c[5];
    sem_init(&empty,0,5);
    sem_init(&full,0,0);
    int i;
    for(i = 0; i < 5; i++) {
        pthread_create(&p[i],NULL,producer,(void *)(&i));
    }
    for(i = 0; i < 5; i++) {
        pthread_create(&c[i],NULL,consumer,(void *)(&i));
    }
    for(i = 0; i < 5; i++) {
        pthread_join(p[i],NULL);
        pthread_join(c[i],NULL);
    }
}

更新代码:

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <queue>
#include <map>
using namespace std;
sem_t empty;
sem_t full;
int cnt = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
map<pthread_t,int> mc,mp;
queue<int> q;
void *producer(void *a)
{   
    while(1) {
        sem_wait(&empty);
        pthread_mutex_lock(&mutex);
        cnt = cnt+1;
        q.push(cnt);
        cout<<cnt<<" item produced by producer "<<mp[pthread_self()]<<endl;
        pthread_mutex_unlock(&mutex);
        sem_post(&full);
        sleep(1);
    }
}
void *consumer(void *a)
{   
    while(1) {
        sem_wait(&full);
        pthread_mutex_lock(&mutex);
        cout<<q.front()<<" item consumed by consumer "<<mc[pthread_self()]<<endl;
        q.pop();
        pthread_mutex_unlock(&mutex);
        sem_post(&empty);
        sleep(1);
    }
}
int main()
{   
    pthread_t p[5];
    pthread_t c[5];
    sem_init(&empty,0,5);
    sem_init(&full,0,0);
    int i;
    pthread_mutex_lock(&mutex);
    for(i = 0; i < 5; i++) {
        pthread_create(&p[i],NULL,producer,NULL);
        pthread_create(&c[i],NULL,consumer,NULL);
        mc[c[i]] = i+1;
        mp[p[i]] = i+1; 
    }
    pthread_mutex_unlock(&mutex);
    for(i = 0; i < 5; i++) {
        pthread_join(p[i],NULL);
        pthread_join(c[i],NULL);
    }
}

简答

线程实际上执行的机会均等,但它们只是打印出一个不属于它们的标识符。

详细解释

您在每个线程中保留一个指向线程编号的指针 num。它是指向被保存的值的指针,而不是值本身。所以所有的线程都指向同一个计数器,想在那里找到它们自己的标识符。

每次访问 *num 时,您访问的不是 i 启动线程时的值,而是它的当前值。

不幸的是,在 main() 的每个循环中,您都重复使用了变量 i。所以在最后一个循环中,您将 i 设置回 0,并等待第一个线程加入。但是所有这些线程永远循环,所以循环几乎没有机会超过这个初始 0 值。这样每个线程都认为此时 *num+1 是 1。

顺便请注意,正如有人在评论中指出的那样,您创建了一个竞争条件:所有消费者和生产者线程取消引用指针,访问互斥锁保护区域中的同一个变量。还行吧。但是当他们读取变量时,主线程仍然可以愉快地在任何锁之外更改共享变量。这绝对是种族风险。

解决方法

std::thread 将允许您通过 walue 传递 i,以便每个线程都有其自己的 is id 的未更改副本。

对于 pthreads,你必须传递一个指向值的指针。不幸的是,即使您在线程的开头对所指向的值进行了本地复制,您仍然会处于竞争状态。

观察哪个线程真正在做工作的快速解决方法是打印输出 pthread_self() (see here 如何做)的结果。或者将 id 存储在一个 int 数组中,并将该数组中唯一元素的地址传递给每个线程。