生产者/消费者任务。正确写入共享缓冲区的问题

producer / consumer task. Problem with correct writing to shared buffer

我正在从事一个解决生产者/消费者调度的经典问题的项目。 Linux打开Suse 42.3 Leep,APISystem V,C语言

该项目由三个程序组成:producer、consumer和scheduler。 调度程序的目的是创建 3 个信号量,共享内存,其中有一个缓冲区(数组),其中有写入(生产者)和读取(消费者)以及 运行 n 个生产者和 m 个消费者进程。

每个生产者必须对缓冲区执行 k 个写入周期,而消费者必须执行 k 个读取周期。

使用了 3 个信号量:互斥、空和满。全信号量的值在程序中用作数组中的索引。

问题是:例如,当缓冲区大小为3时,生产者写入4份数据,当缓冲区大小为4 - 5份数据(虽然应该是4份)...

消费者正常阅读。

此外,该程序在调用 get_semVal 函数时无法预测。

请帮忙,我将非常非常感谢您的回答。

制作人

#define BUFFER_SIZE 3
#define MY_RAND_MAX 99      // Highest integer for random number generator
#define LOOP 3              //the number of write / read cycles for each process
#define DATA_DIMENSION 4    // size of portion of data for 1 iteration 
struct Data {
int buf[DATA_DIMENSION];
};
typedef struct Data buffer_item;

buffer_item buffer[BUFFER_SIZE];

void P(int semid)
{
struct sembuf op;
op.sem_num = 0;
op.sem_op = -1;
op.sem_flg = 0;
semop(semid,&op,1);
}

void V(int semid)
{
struct sembuf op;
op.sem_num = 0;
op.sem_op = +1;
op.sem_flg = 0;
semop(semid,&op,1);
}

void Init(int semid,int index,int value)
{
    semctl(semid,index,SETVAL,value);
}

int get_semVal(int sem_id)
{
    int value = semctl(sem_id,0,GETVAL,0);
    return value;
}

int main()
{
    sem_mutex = semget(KEY_MUTEX,1,0);
    sem_empty = semget(KEY_EMPTY,1,0);
    sem_full = semget(KEY_FULL,1,0);

    srand(time(NULL));

const int SIZE = sizeof(buffer[BUFFER_SIZE]);

shm_id = shmget(KEY_SHARED_MEMORY,SIZE, 0);

int i=0;
buffer_item *adr;
do {
    buffer_item nextProduced;
   
    P(sem_empty);

    P(sem_mutex);

    //prepare portion of data
    for(int j=0;j<DATA_DIMENSION;j++)
    {
        nextProduced.buf[j]=rand()%5;
    }

    adr = (buffer_item*)shmat(shm_id,NULL,0);

    int full_value = get_semVal(sem_full);//get index of array
    printf("-----%d------\n",full_value-1);//it’s for test the index of array in buffer

   // write the generated portion of data by index full_value-1
    adr[full_value-1].buf[0] = nextProduced.buf[0];
    adr[full_value-1].buf[1] = nextProduced.buf[1];
    adr[full_value-1].buf[2] = nextProduced.buf[2];
    adr[full_value-1].buf[3] = nextProduced.buf[3];

    shmdt(adr);


    printf("producer %d produced %d %d %d %d\n", getpid(), nextProduced.buf[0],nextProduced.buf[1],nextProduced.buf[2],nextProduced.buf[3]);

    V(sem_mutex);
    V(sem_full);

    i++;
    } while (i<LOOP);

V(sem_empty); 
sleep(1); 
 }

消费者

 …
int main()
{

sem_mutex = semget(KEY_MUTEX,1,0);
sem_empty = semget(KEY_EMPTY,1,0);
sem_full = semget(KEY_FULL,1,0);

srand(time(NULL));

const int SIZE = sizeof(buffer[BUFFER_SIZE]);

shm_id = shmget(KEY_SHARED_MEMORY,SIZE,0);

int i=0;
buffer_item *adr;
do
{
    buffer_item nextConsumed;

    P(sem_full);
    P(sem_mutex);

    int full_value = get_semVal(sem_full);

    adr = (buffer_item*)shmat(shm_id,NULL,0);

    for(int i=0;i<BUFFER_SIZE;i++)
    {
        printf("--%d %d %d %d\n",adr[i].buf[0],adr[i].buf[1],adr[i].buf[2],adr[i].buf[3]);
    }

    for(int i=0;i<BUFFER_SIZE;i++)
    {
        buffer[i].buf[0] = adr[i].buf[0];
        buffer[i].buf[1] = adr[i].buf[1];
        buffer[i].buf[2] = adr[i].buf[2];
        buffer[i].buf[3] = adr[i].buf[3];
    }

    tab(nextConsumed);

        nextConsumed.buf[0]=buffer[full_value-1].buf[0];
        nextConsumed.buf[1]=buffer[full_value-1].buf[1];
        nextConsumed.buf[2]=buffer[full_value-1].buf[2];
        nextConsumed.buf[3]=buffer[full_value-1].buf[3];



    // Set buffer to 0 since we consumed that item
    for(int j=0;j<DATA_DIMENSION;j++)
    {
        buffer[full_value-1].buf[j]=0;
    }

    for(int i=0;i<BUFFER_SIZE;i++)
    {
        adr[i].buf[0]=buffer[i].buf[0];
        adr[i].buf[1]=buffer[i].buf[1];
        adr[i].buf[2]=buffer[i].buf[2];
        adr[i].buf[3]=buffer[i].buf[3];

    }

    shmdt(adr);

    printf("consumer %d consumed %d %d %d %d\n", getpid() ,nextConsumed.buf[0],nextConsumed.buf[1],nextConsumed.buf[2],nextConsumed.buf[3]);

    V(sem_mutex);
    // increase empty
    V(sem_empty);

    i++;
    } while (i<LOOP);

V(sem_full);
sleep(1);
}

调度程序

…
struct Data {
   int buf[DATA_DIMENSION];
 };
typedef struct Data buffer_item;

buffer_item buffer[BUFFER_SIZE];

struct TProcList
{
  pid_t processPid;
};
typedef struct TProcList ProcList;
 …

ProcList createProcess(char *name)
{
   pid_t pid;
   ProcList a;

   pid = fork();
   if (!pid){
      kill(getpid(),SIGSTOP);
      execl(name,name,NULL);
      exit(0);
   }
   else if(pid){
       a.processPid=pid;
   }
   else
       cout<<"error forking"<<endl;
   return a;
   }

   int main()
   {
      sem_mutex = semget(KEY_MUTEX,1,IPC_CREAT|0600);
      sem_empty = semget(KEY_EMPTY,1,IPC_CREAT|0600);
      sem_full = semget(KEY_FULL,1,IPC_CREAT|0600);

Init(sem_mutex,0,1);//unlock mutex
Init(sem_empty,0,BUFFER_SIZE); 
Init(sem_full,0,0);//unlock empty

const int SIZE = sizeof(buffer[BUFFER_SIZE]);

shm_id = shmget(KEY_SHARED_MEMORY,SIZE,IPC_CREAT|0600);

buffer_item *adr;
adr = (buffer_item*)shmat(shm_id,NULL,0);

for(int i=0;i<BUFFER_SIZE;i++)
{
    buffer[i].buf[0]=0;
    buffer[i].buf[1]=0;
    buffer[i].buf[2]=0;
    buffer[i].buf[3]=0;
}

for(int i=0;i<BUFFER_SIZE;i++)
{
    adr[i].buf[0] = buffer[i].buf[0];
    adr[i].buf[1] = buffer[i].buf[1];
    adr[i].buf[2] = buffer[i].buf[2];
    adr[i].buf[3] = buffer[i].buf[3];

}

int consumerNumber = 2;
int produserNumber = 2;

ProcList producer_pids[produserNumber];
ProcList consumer_pids[consumerNumber];

for(int i=0;i<produserNumber;i++)
{
    producer_pids[i]=createProcess("/home/andrey/build-c-unknown-Debug/c");//create sleeping processes 
}

for(int i=0;i<consumerNumber;i++)
{
    consumer_pids[i]=createProcess("/home/andrey/build-p-unknown-Debug/p");
}

    sleep(3);

for(int i=0;i<produserNumber;i++)
{
   kill(producer_pids[i].processPid,SIGCONT);//continue processes
   sleep(1);
}
for(int i=0;i<consumerNumber;i++)
{
   kill(consumer_pids[i].processPid,SIGCONT);
   sleep(1);
}


for(int i=0;i<produserNumber;i++)
    {
        waitpid(producer_pids[i].processPid,&stat,WNOHANG);//wait
    }
for(int i=0;i<consumerNumber;i++)
    {
        waitpid(consumer_pids[i].processPid,&stat,WNOHANG);
    }

shmdt(adr);

semctl(sem_mutex,0,IPC_RMID);
semctl(sem_full,0,IPC_RMID);
semctl(sem_empty,0,IPC_RMID);

}

尝试解开别人编写的未注释代码并不有趣,因此,我将解释一个经过验证的工作方案。

(请注意,注释应始终解释程序员的意图或想法,而不是代码 做了什么 ;我们可以阅读代码以查看它的作用。问题是,我们需要首先了解程序员 idea/intent,然后我们才能将其与实现进行比较。如果没有注释,我需要先阅读代码以尝试猜测意图,然后将其与代码本身进行比较;它是就像双倍的工作。)

(我怀疑 OP 的潜在问题是试图使用信号量值作为缓冲区索引,但没有仔细检查所有代码以确保 100% 确定。)

假设共享内存结构如下所示:

struct shared {
    sem_t       lock;            /* Initialized to value 1 */
    sem_t       more;            /* Initialized to 0 */
    sem_t       room;            /* Initialized to MAX_ITEMS */
    size_t      num_items;       /* Initialized to 0 */
    size_t      next_item;       /* Initialized to 0 */
    item_type   item[MAX_ITEMS];
};

我们有struct shared *mem指向共享内存区域。

请注意,您应该在 运行 时包含 <limits.h>,并验证 MAX_ITEMS <= SEM_VALUE_MAX。否则 MAX_ITEMS 太大,这个信号量方案可能会失败。 (Linux上的SEM_VALUE_MAX通常是INT_MAX,足够大了,但可能会有所不同。而且,如果编译时使用-O进行优化,检查将被完全优化离开。所以这是一张非常便宜且合理的支票。)

mem->lock 信号量像互斥锁一样使用。也就是说,为了独占访问锁定结构,一个进程等待它。完成后,post就可以了。

请注意,虽然 sem_post(&(mem->lock)) 总是会成功(忽略 mem 为 NULL 或指向未初始化的内存或被垃圾覆盖等错误),但从技术上讲,sem_wait() 可以被中断通过向未安装 SA_RESTART 标志的用户空间处理程序传递信号。这就是为什么我建议使用静态内联辅助函数而不是 sem_wait():

static inline int  semaphore_wait(sem_t *const s)
{
    int  result;
    do {
        result = sem_wait(s);
    } while (result == -1 && errno == EINTR);
    return result;
}

static inline int  semaphore_post(sem_t *const s)
{
    return sem_post(s);
}

在信号传递不应该中断等待信号量的情况下,您使用 semaphore_wait()。如果您确实希望信号传递中断对信号量的等待,请使用 sem_wait();如果它 returns -1errno == EINTR,操作由于信号传递而中断,并且信号量实际上并没有减少。 (许多其他低级函数,如 read()write()send()recv(),可以完全相同的方式被中断;它们也可以只是 return一个短计数,以防中途中断。)

semaphore_post() 只是一个包装器,因此您可以使用“匹配`post 和等待操作。做那种“无用”的包装器确实有助于理解代码,您看。

item[]数组用作循环队列。 num_items 表示其中的项目数。如果 num_items > 0,下一个要消耗的项目是 item[next_item]。如果num_items < MAX_ITEMS,下一个要生产的项目是item[(next_item + num_items) % MAX_ITEMS]

%是模运算符。这里,因为 next_itemnum_items 总是正数,所以 (next_item + num_items) % MAX_ITEMS 总是在 0MAX_ITEMS - 1 之间,包括在内。这就是缓冲区循环的原因。

当一个生产者构造了一个新的项目,比如说item_type newitem;,并想把它添加到共享内存中,它基本上会做以下事情:

    /* Omitted: Initialize and fill in 'newitem' members */

    /* Wait until there is room in the buffer */
    semaphore_wait(&(mem->room));

    /* Get exclusive access to the structure members */
    semaphore_wait(&(mem->lock));

    mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = newitem;
    mem->num_items++;
    sem_post(&(mem->more));

    semaphore_post(&(mem->lock));

上面通常被称为enqueue,因为它将一个项目追加到队列中(恰好通过循环缓冲区实现)。

当消费者想要消费共享缓冲区中的项目 (item_type nextitem;) 时,它会执行以下操作:

    /* Wait until there are items in the buffer */
    semaphore_wait(&(mem->more));

    /* Get exclusive access to the structure members */
    semaphore_wait(&(mem->lock));

    nextitem = mem->item[mem->next_item];
    mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
    mem->num_items = mem->num_items - 1;

    semaphore_post(&(mem->room));

    mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = newitem;
    mem->num_items++;
    sem_post(&(mem->more));

    semaphore_post(&(mem->lock));

    /* Omitted: Do work on 'nextitem' here. */

这通常被称为dequeue,因为它从队列中获取下一个项目。

我建议你先写一个单进程测试用例,入队 MAX_ITEMS,然后出队,并验证信号量值是否恢复到初始值。这并不能保证正确性,但可以解决最典型的错误。

实际上,我个人会在描述共享内存结构的同一个头文件中将排队函数编写为静态内联助手。差不多

static inline int  shared_get(struct shared *const mem, item_type *const into)
{
    int  err;

    if (!mem || !into)
        return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */

    /* Wait for the next item in the buffer. */
    do {
        err = sem_wait(&(mem->more));
    } while (err == -1 && errno == EINTR);
    if (err)
        return errno;

    /* Exclusive access to the structure. */
    do {
        err = sem_wait(&(mem->lock));
    } while (err == -1 && errno == EINTR);

    /* Copy item to caller storage. */
    *into = mem->item[mem->next_item];

    /* Update queue state. */
    mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
    mem->num_items--;

    /* Account for the newly freed slot. */
    sem_post(&(mem->room));

    /* Done. */
    sem_post(&(mem->lock));
    return 0;
}       

static inline int  shared_put(struct shared *const mem, const item_type *const from)
    int  err;

    if (!mem || !into)
        return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */

    /* Wait for room in the buffer. */
    do {
        err = sem_wait(&(mem->room));
    } while (err == -1 && errno == EINTR);
    if (err)
        return errno;

    /* Exclusive access to the structure. */
    do {
        err = sem_wait(&(mem->lock));
    } while (err == -1 && errno == EINTR);

    /* Copy item to queue. */
    mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = *from;

    /* Update queue state. */
    mem->num_items++;

    /* Account for the newly filled slot. */
    sem_post(&(mem->more));

    /* Done. */
    sem_post(&(mem->lock));
    return 0;
}       

但请注意,我是凭记忆写的,而不是从我的测试程序中复制粘贴的,因为我希望你学习而不是在不理解(和怀疑)它的情况下复制粘贴其他人的代码。

Why do we need separate counters (first_item, num_items) when we have the semaphores, with corresponding values?

因为我们无法在sem_wait()succeeded/continued/stopped阻塞的地方捕获信号量值

例如,最初 room 信号量被初始化为 MAX_ITEMS,因此最多可以 运行 并行。在 sem_wait() 之后 运行ning sem_getvalue() 中的任何一个都将获得一些 later 值,而不是导致 sem_wait() 的值或转换到 return。 (即使使用 SysV 信号量,您也无法获得导致此进程等待 return 的信号量值。)

因此,我们认为 more 信号量的值是可以在不阻塞的情况下从缓冲区中出列多少次,而不是缓冲区的索引或计数器,而 room 是具有可以在不阻塞的情况下排队到缓冲区多少次的值。 lock 信号量授予独占访问权限,因此我们可以原子地修改共享内存结构(好吧,next_itemnum_items),而无需不同的进程同时尝试更改值。

我不是 100% 确定这是最好的或最佳的模式,这是最常用的模式之一。它不像我想要的那样健壮:对于 num_items 中的每个增量(一个),必须在 more 上 post 恰好一次;对于 num_items 中的每个减量(1),必须将 next_item 恰好增加 1 并且 post 在 room 上恰好增加一次,否则该方案将崩溃​​。

不过还有最后一个问题:

How do producers indicate they are done? How would the scheduler tell producers and/or consumers to stop?

我的首选解决方案是在共享内存结构中添加一个标志,比如 unsigned int status;,使用特定的位掩码告诉生产者和消费者要做什么,在等待 [=64 后立即检查=]:

#define  STOP_PRODUCING  (1 << 0)
#define  STOP_CONSUMING  (1 << 1)

static inline int  shared_get(struct shared *const mem, item_type *const into)
{
    int  err;

    if (!mem || !into)
        return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */

    /* Wait for the next item in the buffer. */
    do {
        err = sem_wait(&(mem->more));
    } while (err == -1 && errno == EINTR);
    if (err)
        return errno;

    /* Exclusive access to the structure. */
    do {
        err = sem_wait(&(mem->lock));
    } while (err == -1 && errno == EINTR);

    /* Need to stop consuming? */
    if (mem->state & STOP_CONSUMING) {
        /* Ensure all consumers see the state immediately */
        sem_post(&(mem->more));
        sem_post(&(mem->lock));
        /* ENOMSG == please stop. */
        return errno = ENOMSG;
    }

    /* Copy item to caller storage. */
    *into = mem->item[mem->next_item];

    /* Update queue state. */
    mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
    mem->num_items--;

    /* Account for the newly freed slot. */
    sem_post(&(mem->room));

    /* Done. */
    sem_post(&(mem->lock));
    return 0;
}       

static inline int  shared_put(struct shared *const mem, const item_type *const from)
    int  err;

    if (!mem || !into)
        return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */

    /* Wait for room in the buffer. */
    do {
        err = sem_wait(&(mem->room));
    } while (err == -1 && errno == EINTR);
    if (err)
        return errno;

    /* Exclusive access to the structure. */
    do {
        err = sem_wait(&(mem->lock));
    } while (err == -1 && errno == EINTR);

    /* Time to stop? */
    if (mem->state & STOP_PRODUCING) {
        /* Ensure all producers see the state immediately */
        sem_post(&(mem->lock));
        sem_post(&(mem->room));
        /* ENOMSG == please stop. */
        return errno = ENOMSG;
    }

    /* Copy item to queue. */
    mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = *from;

    /* Update queue state. */
    mem->num_items++;

    /* Account for the newly filled slot. */
    sem_post(&(mem->more));

    /* Done. */
    sem_post(&(mem->lock));
    return 0;
}

which return ENOMSG 给调用者如果调用者应该停止。当状态改变时,当然应该持有lock。添加 STOP_PRODUCING 时,还应该在 room 信号量上 post (一次)以启动“级联”,以便所有生产者停止;并且在 more 信号量上添加 STOP_CONSUMING、post 时(一次)启动消费者停止级联。 (他们每个人都会post再上一次,以确保每个producer/consumer尽快看到状态。)

不过还有其他方案;例如信号(设置 volatile sig_atomic_t 标志),但通常很难确保没有竞争 windows:进程在更改标志之前检查标志,然后阻塞信号量。

在这个方案中,最好同时验证MAX_ITEMS + NUM_PRODUCERS <= SEM_VALUE_MAXMAX_ITEMS + NUM_CONSUMERS <= SEM_VALUE_MAX,这样即使在停止级联期间,信号量值也不会溢出。