生产者/消费者任务。正确写入共享缓冲区的问题
producer / consumer task. Problem with correct writing to shared buffer
我正在从事一个解决生产者/消费者调度的经典问题的项目。
Linux打开Suse 42.3 Leep,APISystem V,C语言
该项目由三个程序组成:producer、consumer和scheduler。
调度程序的目的是创建 3 个信号量,共享内存,其中有一个缓冲区(数组),其中有写入(生产者)和读取(消费者)以及 运行 n 个生产者和 m 个消费者进程。
每个生产者必须对缓冲区执行 k 个写入周期,而消费者必须执行 k 个读取周期。
使用了 3 个信号量:互斥、空和满。全信号量的值在程序中用作数组中的索引。
问题是:例如,当缓冲区大小为3时,生产者写入4份数据,当缓冲区大小为4 - 5份数据(虽然应该是4份)...
消费者正常阅读。
此外,该程序在调用 get_semVal 函数时无法预测。
请帮忙,我将非常非常感谢您的回答。
制作人
#define BUFFER_SIZE 3
#define MY_RAND_MAX 99 // Highest integer for random number generator
#define LOOP 3 //the number of write / read cycles for each process
#define DATA_DIMENSION 4 // size of portion of data for 1 iteration
struct Data {
int buf[DATA_DIMENSION];
};
typedef struct Data buffer_item;
buffer_item buffer[BUFFER_SIZE];
void P(int semid)
{
struct sembuf op;
op.sem_num = 0;
op.sem_op = -1;
op.sem_flg = 0;
semop(semid,&op,1);
}
void V(int semid)
{
struct sembuf op;
op.sem_num = 0;
op.sem_op = +1;
op.sem_flg = 0;
semop(semid,&op,1);
}
void Init(int semid,int index,int value)
{
semctl(semid,index,SETVAL,value);
}
int get_semVal(int sem_id)
{
int value = semctl(sem_id,0,GETVAL,0);
return value;
}
int main()
{
sem_mutex = semget(KEY_MUTEX,1,0);
sem_empty = semget(KEY_EMPTY,1,0);
sem_full = semget(KEY_FULL,1,0);
srand(time(NULL));
const int SIZE = sizeof(buffer[BUFFER_SIZE]);
shm_id = shmget(KEY_SHARED_MEMORY,SIZE, 0);
int i=0;
buffer_item *adr;
do {
buffer_item nextProduced;
P(sem_empty);
P(sem_mutex);
//prepare portion of data
for(int j=0;j<DATA_DIMENSION;j++)
{
nextProduced.buf[j]=rand()%5;
}
adr = (buffer_item*)shmat(shm_id,NULL,0);
int full_value = get_semVal(sem_full);//get index of array
printf("-----%d------\n",full_value-1);//it’s for test the index of array in buffer
// write the generated portion of data by index full_value-1
adr[full_value-1].buf[0] = nextProduced.buf[0];
adr[full_value-1].buf[1] = nextProduced.buf[1];
adr[full_value-1].buf[2] = nextProduced.buf[2];
adr[full_value-1].buf[3] = nextProduced.buf[3];
shmdt(adr);
printf("producer %d produced %d %d %d %d\n", getpid(), nextProduced.buf[0],nextProduced.buf[1],nextProduced.buf[2],nextProduced.buf[3]);
V(sem_mutex);
V(sem_full);
i++;
} while (i<LOOP);
V(sem_empty);
sleep(1);
}
消费者
…
int main()
{
sem_mutex = semget(KEY_MUTEX,1,0);
sem_empty = semget(KEY_EMPTY,1,0);
sem_full = semget(KEY_FULL,1,0);
srand(time(NULL));
const int SIZE = sizeof(buffer[BUFFER_SIZE]);
shm_id = shmget(KEY_SHARED_MEMORY,SIZE,0);
int i=0;
buffer_item *adr;
do
{
buffer_item nextConsumed;
P(sem_full);
P(sem_mutex);
int full_value = get_semVal(sem_full);
adr = (buffer_item*)shmat(shm_id,NULL,0);
for(int i=0;i<BUFFER_SIZE;i++)
{
printf("--%d %d %d %d\n",adr[i].buf[0],adr[i].buf[1],adr[i].buf[2],adr[i].buf[3]);
}
for(int i=0;i<BUFFER_SIZE;i++)
{
buffer[i].buf[0] = adr[i].buf[0];
buffer[i].buf[1] = adr[i].buf[1];
buffer[i].buf[2] = adr[i].buf[2];
buffer[i].buf[3] = adr[i].buf[3];
}
tab(nextConsumed);
nextConsumed.buf[0]=buffer[full_value-1].buf[0];
nextConsumed.buf[1]=buffer[full_value-1].buf[1];
nextConsumed.buf[2]=buffer[full_value-1].buf[2];
nextConsumed.buf[3]=buffer[full_value-1].buf[3];
// Set buffer to 0 since we consumed that item
for(int j=0;j<DATA_DIMENSION;j++)
{
buffer[full_value-1].buf[j]=0;
}
for(int i=0;i<BUFFER_SIZE;i++)
{
adr[i].buf[0]=buffer[i].buf[0];
adr[i].buf[1]=buffer[i].buf[1];
adr[i].buf[2]=buffer[i].buf[2];
adr[i].buf[3]=buffer[i].buf[3];
}
shmdt(adr);
printf("consumer %d consumed %d %d %d %d\n", getpid() ,nextConsumed.buf[0],nextConsumed.buf[1],nextConsumed.buf[2],nextConsumed.buf[3]);
V(sem_mutex);
// increase empty
V(sem_empty);
i++;
} while (i<LOOP);
V(sem_full);
sleep(1);
}
调度程序
…
struct Data {
int buf[DATA_DIMENSION];
};
typedef struct Data buffer_item;
buffer_item buffer[BUFFER_SIZE];
struct TProcList
{
pid_t processPid;
};
typedef struct TProcList ProcList;
…
ProcList createProcess(char *name)
{
pid_t pid;
ProcList a;
pid = fork();
if (!pid){
kill(getpid(),SIGSTOP);
execl(name,name,NULL);
exit(0);
}
else if(pid){
a.processPid=pid;
}
else
cout<<"error forking"<<endl;
return a;
}
int main()
{
sem_mutex = semget(KEY_MUTEX,1,IPC_CREAT|0600);
sem_empty = semget(KEY_EMPTY,1,IPC_CREAT|0600);
sem_full = semget(KEY_FULL,1,IPC_CREAT|0600);
Init(sem_mutex,0,1);//unlock mutex
Init(sem_empty,0,BUFFER_SIZE);
Init(sem_full,0,0);//unlock empty
const int SIZE = sizeof(buffer[BUFFER_SIZE]);
shm_id = shmget(KEY_SHARED_MEMORY,SIZE,IPC_CREAT|0600);
buffer_item *adr;
adr = (buffer_item*)shmat(shm_id,NULL,0);
for(int i=0;i<BUFFER_SIZE;i++)
{
buffer[i].buf[0]=0;
buffer[i].buf[1]=0;
buffer[i].buf[2]=0;
buffer[i].buf[3]=0;
}
for(int i=0;i<BUFFER_SIZE;i++)
{
adr[i].buf[0] = buffer[i].buf[0];
adr[i].buf[1] = buffer[i].buf[1];
adr[i].buf[2] = buffer[i].buf[2];
adr[i].buf[3] = buffer[i].buf[3];
}
int consumerNumber = 2;
int produserNumber = 2;
ProcList producer_pids[produserNumber];
ProcList consumer_pids[consumerNumber];
for(int i=0;i<produserNumber;i++)
{
producer_pids[i]=createProcess("/home/andrey/build-c-unknown-Debug/c");//create sleeping processes
}
for(int i=0;i<consumerNumber;i++)
{
consumer_pids[i]=createProcess("/home/andrey/build-p-unknown-Debug/p");
}
sleep(3);
for(int i=0;i<produserNumber;i++)
{
kill(producer_pids[i].processPid,SIGCONT);//continue processes
sleep(1);
}
for(int i=0;i<consumerNumber;i++)
{
kill(consumer_pids[i].processPid,SIGCONT);
sleep(1);
}
for(int i=0;i<produserNumber;i++)
{
waitpid(producer_pids[i].processPid,&stat,WNOHANG);//wait
}
for(int i=0;i<consumerNumber;i++)
{
waitpid(consumer_pids[i].processPid,&stat,WNOHANG);
}
shmdt(adr);
semctl(sem_mutex,0,IPC_RMID);
semctl(sem_full,0,IPC_RMID);
semctl(sem_empty,0,IPC_RMID);
}
尝试解开别人编写的未注释代码并不有趣,因此,我将解释一个经过验证的工作方案。
(请注意,注释应始终解释程序员的意图或想法,而不是代码 做了什么 ;我们可以阅读代码以查看它的作用。问题是,我们需要首先了解程序员 idea/intent,然后我们才能将其与实现进行比较。如果没有注释,我需要先阅读代码以尝试猜测意图,然后将其与代码本身进行比较;它是就像双倍的工作。)
(我怀疑 OP 的潜在问题是试图使用信号量值作为缓冲区索引,但没有仔细检查所有代码以确保 100% 确定。)
假设共享内存结构如下所示:
struct shared {
sem_t lock; /* Initialized to value 1 */
sem_t more; /* Initialized to 0 */
sem_t room; /* Initialized to MAX_ITEMS */
size_t num_items; /* Initialized to 0 */
size_t next_item; /* Initialized to 0 */
item_type item[MAX_ITEMS];
};
我们有struct shared *mem
指向共享内存区域。
请注意,您应该在 运行 时包含 <limits.h>
,并验证 MAX_ITEMS <= SEM_VALUE_MAX
。否则 MAX_ITEMS
太大,这个信号量方案可能会失败。 (Linux上的SEM_VALUE_MAX
通常是INT_MAX
,足够大了,但可能会有所不同。而且,如果编译时使用-O
进行优化,检查将被完全优化离开。所以这是一张非常便宜且合理的支票。)
mem->lock
信号量像互斥锁一样使用。也就是说,为了独占访问锁定结构,一个进程等待它。完成后,post就可以了。
请注意,虽然 sem_post(&(mem->lock))
总是会成功(忽略 mem
为 NULL 或指向未初始化的内存或被垃圾覆盖等错误),但从技术上讲,sem_wait()
可以被中断通过向未安装 SA_RESTART
标志的用户空间处理程序传递信号。这就是为什么我建议使用静态内联辅助函数而不是 sem_wait()
:
static inline int semaphore_wait(sem_t *const s)
{
int result;
do {
result = sem_wait(s);
} while (result == -1 && errno == EINTR);
return result;
}
static inline int semaphore_post(sem_t *const s)
{
return sem_post(s);
}
在信号传递不应该中断等待信号量的情况下,您使用 semaphore_wait()
。如果您确实希望信号传递中断对信号量的等待,请使用 sem_wait()
;如果它 returns -1
和 errno == EINTR
,操作由于信号传递而中断,并且信号量实际上并没有减少。 (许多其他低级函数,如 read()
、write()
、send()
、recv()
,可以完全相同的方式被中断;它们也可以只是 return一个短计数,以防中途中断。)
semaphore_post()
只是一个包装器,因此您可以使用“匹配`post 和等待操作。做那种“无用”的包装器确实有助于理解代码,您看。
item[]
数组用作循环队列。 num_items
表示其中的项目数。如果 num_items > 0
,下一个要消耗的项目是 item[next_item]
。如果num_items < MAX_ITEMS
,下一个要生产的项目是item[(next_item + num_items) % MAX_ITEMS]
。
%
是模运算符。这里,因为 next_item
和 num_items
总是正数,所以 (next_item + num_items) % MAX_ITEMS
总是在 0
和 MAX_ITEMS - 1
之间,包括在内。这就是缓冲区循环的原因。
当一个生产者构造了一个新的项目,比如说item_type newitem;
,并想把它添加到共享内存中,它基本上会做以下事情:
/* Omitted: Initialize and fill in 'newitem' members */
/* Wait until there is room in the buffer */
semaphore_wait(&(mem->room));
/* Get exclusive access to the structure members */
semaphore_wait(&(mem->lock));
mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = newitem;
mem->num_items++;
sem_post(&(mem->more));
semaphore_post(&(mem->lock));
上面通常被称为enqueue,因为它将一个项目追加到队列中(恰好通过循环缓冲区实现)。
当消费者想要消费共享缓冲区中的项目 (item_type nextitem;
) 时,它会执行以下操作:
/* Wait until there are items in the buffer */
semaphore_wait(&(mem->more));
/* Get exclusive access to the structure members */
semaphore_wait(&(mem->lock));
nextitem = mem->item[mem->next_item];
mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
mem->num_items = mem->num_items - 1;
semaphore_post(&(mem->room));
mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = newitem;
mem->num_items++;
sem_post(&(mem->more));
semaphore_post(&(mem->lock));
/* Omitted: Do work on 'nextitem' here. */
这通常被称为dequeue,因为它从队列中获取下一个项目。
我建议你先写一个单进程测试用例,入队 MAX_ITEMS
,然后出队,并验证信号量值是否恢复到初始值。这并不能保证正确性,但可以解决最典型的错误。
实际上,我个人会在描述共享内存结构的同一个头文件中将排队函数编写为静态内联助手。差不多
static inline int shared_get(struct shared *const mem, item_type *const into)
{
int err;
if (!mem || !into)
return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */
/* Wait for the next item in the buffer. */
do {
err = sem_wait(&(mem->more));
} while (err == -1 && errno == EINTR);
if (err)
return errno;
/* Exclusive access to the structure. */
do {
err = sem_wait(&(mem->lock));
} while (err == -1 && errno == EINTR);
/* Copy item to caller storage. */
*into = mem->item[mem->next_item];
/* Update queue state. */
mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
mem->num_items--;
/* Account for the newly freed slot. */
sem_post(&(mem->room));
/* Done. */
sem_post(&(mem->lock));
return 0;
}
和
static inline int shared_put(struct shared *const mem, const item_type *const from)
int err;
if (!mem || !into)
return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */
/* Wait for room in the buffer. */
do {
err = sem_wait(&(mem->room));
} while (err == -1 && errno == EINTR);
if (err)
return errno;
/* Exclusive access to the structure. */
do {
err = sem_wait(&(mem->lock));
} while (err == -1 && errno == EINTR);
/* Copy item to queue. */
mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = *from;
/* Update queue state. */
mem->num_items++;
/* Account for the newly filled slot. */
sem_post(&(mem->more));
/* Done. */
sem_post(&(mem->lock));
return 0;
}
但请注意,我是凭记忆写的,而不是从我的测试程序中复制粘贴的,因为我希望你学习而不是在不理解(和怀疑)它的情况下复制粘贴其他人的代码。
Why do we need separate counters (first_item
, num_items
) when we have the semaphores, with corresponding values?
因为我们无法在sem_wait()
succeeded/continued/stopped阻塞的地方捕获信号量值
例如,最初 room
信号量被初始化为 MAX_ITEMS
,因此最多可以 运行 并行。在 sem_wait()
之后 运行ning sem_getvalue()
中的任何一个都将获得一些 later 值,而不是导致 sem_wait()
的值或转换到 return。 (即使使用 SysV 信号量,您也无法获得导致此进程等待 return 的信号量值。)
因此,我们认为 more
信号量的值是可以在不阻塞的情况下从缓冲区中出列多少次,而不是缓冲区的索引或计数器,而 room
是具有可以在不阻塞的情况下排队到缓冲区多少次的值。 lock
信号量授予独占访问权限,因此我们可以原子地修改共享内存结构(好吧,next_item
和 num_items
),而无需不同的进程同时尝试更改值。
我不是 100% 确定这是最好的或最佳的模式,这是最常用的模式之一。它不像我想要的那样健壮:对于 num_items
中的每个增量(一个),必须在 more
上 post 恰好一次;对于 num_items
中的每个减量(1),必须将 next_item
恰好增加 1 并且 post 在 room
上恰好增加一次,否则该方案将崩溃。
不过还有最后一个问题:
How do producers indicate they are done?
How would the scheduler tell producers and/or consumers to stop?
我的首选解决方案是在共享内存结构中添加一个标志,比如 unsigned int status;
,使用特定的位掩码告诉生产者和消费者要做什么,在等待 [=64 后立即检查=]:
#define STOP_PRODUCING (1 << 0)
#define STOP_CONSUMING (1 << 1)
static inline int shared_get(struct shared *const mem, item_type *const into)
{
int err;
if (!mem || !into)
return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */
/* Wait for the next item in the buffer. */
do {
err = sem_wait(&(mem->more));
} while (err == -1 && errno == EINTR);
if (err)
return errno;
/* Exclusive access to the structure. */
do {
err = sem_wait(&(mem->lock));
} while (err == -1 && errno == EINTR);
/* Need to stop consuming? */
if (mem->state & STOP_CONSUMING) {
/* Ensure all consumers see the state immediately */
sem_post(&(mem->more));
sem_post(&(mem->lock));
/* ENOMSG == please stop. */
return errno = ENOMSG;
}
/* Copy item to caller storage. */
*into = mem->item[mem->next_item];
/* Update queue state. */
mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
mem->num_items--;
/* Account for the newly freed slot. */
sem_post(&(mem->room));
/* Done. */
sem_post(&(mem->lock));
return 0;
}
static inline int shared_put(struct shared *const mem, const item_type *const from)
int err;
if (!mem || !into)
return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */
/* Wait for room in the buffer. */
do {
err = sem_wait(&(mem->room));
} while (err == -1 && errno == EINTR);
if (err)
return errno;
/* Exclusive access to the structure. */
do {
err = sem_wait(&(mem->lock));
} while (err == -1 && errno == EINTR);
/* Time to stop? */
if (mem->state & STOP_PRODUCING) {
/* Ensure all producers see the state immediately */
sem_post(&(mem->lock));
sem_post(&(mem->room));
/* ENOMSG == please stop. */
return errno = ENOMSG;
}
/* Copy item to queue. */
mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = *from;
/* Update queue state. */
mem->num_items++;
/* Account for the newly filled slot. */
sem_post(&(mem->more));
/* Done. */
sem_post(&(mem->lock));
return 0;
}
which return ENOMSG
给调用者如果调用者应该停止。当状态改变时,当然应该持有lock
。添加 STOP_PRODUCING
时,还应该在 room
信号量上 post (一次)以启动“级联”,以便所有生产者停止;并且在 more
信号量上添加 STOP_CONSUMING
、post 时(一次)启动消费者停止级联。 (他们每个人都会post再上一次,以确保每个producer/consumer尽快看到状态。)
不过还有其他方案;例如信号(设置 volatile sig_atomic_t
标志),但通常很难确保没有竞争 windows:进程在更改标志之前检查标志,然后阻塞信号量。
在这个方案中,最好同时验证MAX_ITEMS + NUM_PRODUCERS <= SEM_VALUE_MAX
和MAX_ITEMS + NUM_CONSUMERS <= SEM_VALUE_MAX
,这样即使在停止级联期间,信号量值也不会溢出。
我正在从事一个解决生产者/消费者调度的经典问题的项目。 Linux打开Suse 42.3 Leep,APISystem V,C语言
该项目由三个程序组成:producer、consumer和scheduler。 调度程序的目的是创建 3 个信号量,共享内存,其中有一个缓冲区(数组),其中有写入(生产者)和读取(消费者)以及 运行 n 个生产者和 m 个消费者进程。
每个生产者必须对缓冲区执行 k 个写入周期,而消费者必须执行 k 个读取周期。
使用了 3 个信号量:互斥、空和满。全信号量的值在程序中用作数组中的索引。
问题是:例如,当缓冲区大小为3时,生产者写入4份数据,当缓冲区大小为4 - 5份数据(虽然应该是4份)...
消费者正常阅读。
此外,该程序在调用 get_semVal 函数时无法预测。
请帮忙,我将非常非常感谢您的回答。
制作人
#define BUFFER_SIZE 3
#define MY_RAND_MAX 99 // Highest integer for random number generator
#define LOOP 3 //the number of write / read cycles for each process
#define DATA_DIMENSION 4 // size of portion of data for 1 iteration
struct Data {
int buf[DATA_DIMENSION];
};
typedef struct Data buffer_item;
buffer_item buffer[BUFFER_SIZE];
void P(int semid)
{
struct sembuf op;
op.sem_num = 0;
op.sem_op = -1;
op.sem_flg = 0;
semop(semid,&op,1);
}
void V(int semid)
{
struct sembuf op;
op.sem_num = 0;
op.sem_op = +1;
op.sem_flg = 0;
semop(semid,&op,1);
}
void Init(int semid,int index,int value)
{
semctl(semid,index,SETVAL,value);
}
int get_semVal(int sem_id)
{
int value = semctl(sem_id,0,GETVAL,0);
return value;
}
int main()
{
sem_mutex = semget(KEY_MUTEX,1,0);
sem_empty = semget(KEY_EMPTY,1,0);
sem_full = semget(KEY_FULL,1,0);
srand(time(NULL));
const int SIZE = sizeof(buffer[BUFFER_SIZE]);
shm_id = shmget(KEY_SHARED_MEMORY,SIZE, 0);
int i=0;
buffer_item *adr;
do {
buffer_item nextProduced;
P(sem_empty);
P(sem_mutex);
//prepare portion of data
for(int j=0;j<DATA_DIMENSION;j++)
{
nextProduced.buf[j]=rand()%5;
}
adr = (buffer_item*)shmat(shm_id,NULL,0);
int full_value = get_semVal(sem_full);//get index of array
printf("-----%d------\n",full_value-1);//it’s for test the index of array in buffer
// write the generated portion of data by index full_value-1
adr[full_value-1].buf[0] = nextProduced.buf[0];
adr[full_value-1].buf[1] = nextProduced.buf[1];
adr[full_value-1].buf[2] = nextProduced.buf[2];
adr[full_value-1].buf[3] = nextProduced.buf[3];
shmdt(adr);
printf("producer %d produced %d %d %d %d\n", getpid(), nextProduced.buf[0],nextProduced.buf[1],nextProduced.buf[2],nextProduced.buf[3]);
V(sem_mutex);
V(sem_full);
i++;
} while (i<LOOP);
V(sem_empty);
sleep(1);
}
消费者
…
int main()
{
sem_mutex = semget(KEY_MUTEX,1,0);
sem_empty = semget(KEY_EMPTY,1,0);
sem_full = semget(KEY_FULL,1,0);
srand(time(NULL));
const int SIZE = sizeof(buffer[BUFFER_SIZE]);
shm_id = shmget(KEY_SHARED_MEMORY,SIZE,0);
int i=0;
buffer_item *adr;
do
{
buffer_item nextConsumed;
P(sem_full);
P(sem_mutex);
int full_value = get_semVal(sem_full);
adr = (buffer_item*)shmat(shm_id,NULL,0);
for(int i=0;i<BUFFER_SIZE;i++)
{
printf("--%d %d %d %d\n",adr[i].buf[0],adr[i].buf[1],adr[i].buf[2],adr[i].buf[3]);
}
for(int i=0;i<BUFFER_SIZE;i++)
{
buffer[i].buf[0] = adr[i].buf[0];
buffer[i].buf[1] = adr[i].buf[1];
buffer[i].buf[2] = adr[i].buf[2];
buffer[i].buf[3] = adr[i].buf[3];
}
tab(nextConsumed);
nextConsumed.buf[0]=buffer[full_value-1].buf[0];
nextConsumed.buf[1]=buffer[full_value-1].buf[1];
nextConsumed.buf[2]=buffer[full_value-1].buf[2];
nextConsumed.buf[3]=buffer[full_value-1].buf[3];
// Set buffer to 0 since we consumed that item
for(int j=0;j<DATA_DIMENSION;j++)
{
buffer[full_value-1].buf[j]=0;
}
for(int i=0;i<BUFFER_SIZE;i++)
{
adr[i].buf[0]=buffer[i].buf[0];
adr[i].buf[1]=buffer[i].buf[1];
adr[i].buf[2]=buffer[i].buf[2];
adr[i].buf[3]=buffer[i].buf[3];
}
shmdt(adr);
printf("consumer %d consumed %d %d %d %d\n", getpid() ,nextConsumed.buf[0],nextConsumed.buf[1],nextConsumed.buf[2],nextConsumed.buf[3]);
V(sem_mutex);
// increase empty
V(sem_empty);
i++;
} while (i<LOOP);
V(sem_full);
sleep(1);
}
调度程序
…
struct Data {
int buf[DATA_DIMENSION];
};
typedef struct Data buffer_item;
buffer_item buffer[BUFFER_SIZE];
struct TProcList
{
pid_t processPid;
};
typedef struct TProcList ProcList;
…
ProcList createProcess(char *name)
{
pid_t pid;
ProcList a;
pid = fork();
if (!pid){
kill(getpid(),SIGSTOP);
execl(name,name,NULL);
exit(0);
}
else if(pid){
a.processPid=pid;
}
else
cout<<"error forking"<<endl;
return a;
}
int main()
{
sem_mutex = semget(KEY_MUTEX,1,IPC_CREAT|0600);
sem_empty = semget(KEY_EMPTY,1,IPC_CREAT|0600);
sem_full = semget(KEY_FULL,1,IPC_CREAT|0600);
Init(sem_mutex,0,1);//unlock mutex
Init(sem_empty,0,BUFFER_SIZE);
Init(sem_full,0,0);//unlock empty
const int SIZE = sizeof(buffer[BUFFER_SIZE]);
shm_id = shmget(KEY_SHARED_MEMORY,SIZE,IPC_CREAT|0600);
buffer_item *adr;
adr = (buffer_item*)shmat(shm_id,NULL,0);
for(int i=0;i<BUFFER_SIZE;i++)
{
buffer[i].buf[0]=0;
buffer[i].buf[1]=0;
buffer[i].buf[2]=0;
buffer[i].buf[3]=0;
}
for(int i=0;i<BUFFER_SIZE;i++)
{
adr[i].buf[0] = buffer[i].buf[0];
adr[i].buf[1] = buffer[i].buf[1];
adr[i].buf[2] = buffer[i].buf[2];
adr[i].buf[3] = buffer[i].buf[3];
}
int consumerNumber = 2;
int produserNumber = 2;
ProcList producer_pids[produserNumber];
ProcList consumer_pids[consumerNumber];
for(int i=0;i<produserNumber;i++)
{
producer_pids[i]=createProcess("/home/andrey/build-c-unknown-Debug/c");//create sleeping processes
}
for(int i=0;i<consumerNumber;i++)
{
consumer_pids[i]=createProcess("/home/andrey/build-p-unknown-Debug/p");
}
sleep(3);
for(int i=0;i<produserNumber;i++)
{
kill(producer_pids[i].processPid,SIGCONT);//continue processes
sleep(1);
}
for(int i=0;i<consumerNumber;i++)
{
kill(consumer_pids[i].processPid,SIGCONT);
sleep(1);
}
for(int i=0;i<produserNumber;i++)
{
waitpid(producer_pids[i].processPid,&stat,WNOHANG);//wait
}
for(int i=0;i<consumerNumber;i++)
{
waitpid(consumer_pids[i].processPid,&stat,WNOHANG);
}
shmdt(adr);
semctl(sem_mutex,0,IPC_RMID);
semctl(sem_full,0,IPC_RMID);
semctl(sem_empty,0,IPC_RMID);
}
尝试解开别人编写的未注释代码并不有趣,因此,我将解释一个经过验证的工作方案。
(请注意,注释应始终解释程序员的意图或想法,而不是代码 做了什么 ;我们可以阅读代码以查看它的作用。问题是,我们需要首先了解程序员 idea/intent,然后我们才能将其与实现进行比较。如果没有注释,我需要先阅读代码以尝试猜测意图,然后将其与代码本身进行比较;它是就像双倍的工作。)
(我怀疑 OP 的潜在问题是试图使用信号量值作为缓冲区索引,但没有仔细检查所有代码以确保 100% 确定。)
假设共享内存结构如下所示:
struct shared {
sem_t lock; /* Initialized to value 1 */
sem_t more; /* Initialized to 0 */
sem_t room; /* Initialized to MAX_ITEMS */
size_t num_items; /* Initialized to 0 */
size_t next_item; /* Initialized to 0 */
item_type item[MAX_ITEMS];
};
我们有struct shared *mem
指向共享内存区域。
请注意,您应该在 运行 时包含 <limits.h>
,并验证 MAX_ITEMS <= SEM_VALUE_MAX
。否则 MAX_ITEMS
太大,这个信号量方案可能会失败。 (Linux上的SEM_VALUE_MAX
通常是INT_MAX
,足够大了,但可能会有所不同。而且,如果编译时使用-O
进行优化,检查将被完全优化离开。所以这是一张非常便宜且合理的支票。)
mem->lock
信号量像互斥锁一样使用。也就是说,为了独占访问锁定结构,一个进程等待它。完成后,post就可以了。
请注意,虽然 sem_post(&(mem->lock))
总是会成功(忽略 mem
为 NULL 或指向未初始化的内存或被垃圾覆盖等错误),但从技术上讲,sem_wait()
可以被中断通过向未安装 SA_RESTART
标志的用户空间处理程序传递信号。这就是为什么我建议使用静态内联辅助函数而不是 sem_wait()
:
static inline int semaphore_wait(sem_t *const s)
{
int result;
do {
result = sem_wait(s);
} while (result == -1 && errno == EINTR);
return result;
}
static inline int semaphore_post(sem_t *const s)
{
return sem_post(s);
}
在信号传递不应该中断等待信号量的情况下,您使用 semaphore_wait()
。如果您确实希望信号传递中断对信号量的等待,请使用 sem_wait()
;如果它 returns -1
和 errno == EINTR
,操作由于信号传递而中断,并且信号量实际上并没有减少。 (许多其他低级函数,如 read()
、write()
、send()
、recv()
,可以完全相同的方式被中断;它们也可以只是 return一个短计数,以防中途中断。)
semaphore_post()
只是一个包装器,因此您可以使用“匹配`post 和等待操作。做那种“无用”的包装器确实有助于理解代码,您看。
item[]
数组用作循环队列。 num_items
表示其中的项目数。如果 num_items > 0
,下一个要消耗的项目是 item[next_item]
。如果num_items < MAX_ITEMS
,下一个要生产的项目是item[(next_item + num_items) % MAX_ITEMS]
。
%
是模运算符。这里,因为 next_item
和 num_items
总是正数,所以 (next_item + num_items) % MAX_ITEMS
总是在 0
和 MAX_ITEMS - 1
之间,包括在内。这就是缓冲区循环的原因。
当一个生产者构造了一个新的项目,比如说item_type newitem;
,并想把它添加到共享内存中,它基本上会做以下事情:
/* Omitted: Initialize and fill in 'newitem' members */
/* Wait until there is room in the buffer */
semaphore_wait(&(mem->room));
/* Get exclusive access to the structure members */
semaphore_wait(&(mem->lock));
mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = newitem;
mem->num_items++;
sem_post(&(mem->more));
semaphore_post(&(mem->lock));
上面通常被称为enqueue,因为它将一个项目追加到队列中(恰好通过循环缓冲区实现)。
当消费者想要消费共享缓冲区中的项目 (item_type nextitem;
) 时,它会执行以下操作:
/* Wait until there are items in the buffer */
semaphore_wait(&(mem->more));
/* Get exclusive access to the structure members */
semaphore_wait(&(mem->lock));
nextitem = mem->item[mem->next_item];
mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
mem->num_items = mem->num_items - 1;
semaphore_post(&(mem->room));
mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = newitem;
mem->num_items++;
sem_post(&(mem->more));
semaphore_post(&(mem->lock));
/* Omitted: Do work on 'nextitem' here. */
这通常被称为dequeue,因为它从队列中获取下一个项目。
我建议你先写一个单进程测试用例,入队 MAX_ITEMS
,然后出队,并验证信号量值是否恢复到初始值。这并不能保证正确性,但可以解决最典型的错误。
实际上,我个人会在描述共享内存结构的同一个头文件中将排队函数编写为静态内联助手。差不多
static inline int shared_get(struct shared *const mem, item_type *const into)
{
int err;
if (!mem || !into)
return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */
/* Wait for the next item in the buffer. */
do {
err = sem_wait(&(mem->more));
} while (err == -1 && errno == EINTR);
if (err)
return errno;
/* Exclusive access to the structure. */
do {
err = sem_wait(&(mem->lock));
} while (err == -1 && errno == EINTR);
/* Copy item to caller storage. */
*into = mem->item[mem->next_item];
/* Update queue state. */
mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
mem->num_items--;
/* Account for the newly freed slot. */
sem_post(&(mem->room));
/* Done. */
sem_post(&(mem->lock));
return 0;
}
和
static inline int shared_put(struct shared *const mem, const item_type *const from)
int err;
if (!mem || !into)
return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */
/* Wait for room in the buffer. */
do {
err = sem_wait(&(mem->room));
} while (err == -1 && errno == EINTR);
if (err)
return errno;
/* Exclusive access to the structure. */
do {
err = sem_wait(&(mem->lock));
} while (err == -1 && errno == EINTR);
/* Copy item to queue. */
mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = *from;
/* Update queue state. */
mem->num_items++;
/* Account for the newly filled slot. */
sem_post(&(mem->more));
/* Done. */
sem_post(&(mem->lock));
return 0;
}
但请注意,我是凭记忆写的,而不是从我的测试程序中复制粘贴的,因为我希望你学习而不是在不理解(和怀疑)它的情况下复制粘贴其他人的代码。
Why do we need separate counters (
first_item
,num_items
) when we have the semaphores, with corresponding values?
因为我们无法在sem_wait()
succeeded/continued/stopped阻塞的地方捕获信号量值
例如,最初 room
信号量被初始化为 MAX_ITEMS
,因此最多可以 运行 并行。在 sem_wait()
之后 运行ning sem_getvalue()
中的任何一个都将获得一些 later 值,而不是导致 sem_wait()
的值或转换到 return。 (即使使用 SysV 信号量,您也无法获得导致此进程等待 return 的信号量值。)
因此,我们认为 more
信号量的值是可以在不阻塞的情况下从缓冲区中出列多少次,而不是缓冲区的索引或计数器,而 room
是具有可以在不阻塞的情况下排队到缓冲区多少次的值。 lock
信号量授予独占访问权限,因此我们可以原子地修改共享内存结构(好吧,next_item
和 num_items
),而无需不同的进程同时尝试更改值。
我不是 100% 确定这是最好的或最佳的模式,这是最常用的模式之一。它不像我想要的那样健壮:对于 num_items
中的每个增量(一个),必须在 more
上 post 恰好一次;对于 num_items
中的每个减量(1),必须将 next_item
恰好增加 1 并且 post 在 room
上恰好增加一次,否则该方案将崩溃。
不过还有最后一个问题:
How do producers indicate they are done? How would the scheduler tell producers and/or consumers to stop?
我的首选解决方案是在共享内存结构中添加一个标志,比如 unsigned int status;
,使用特定的位掩码告诉生产者和消费者要做什么,在等待 [=64 后立即检查=]:
#define STOP_PRODUCING (1 << 0)
#define STOP_CONSUMING (1 << 1)
static inline int shared_get(struct shared *const mem, item_type *const into)
{
int err;
if (!mem || !into)
return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */
/* Wait for the next item in the buffer. */
do {
err = sem_wait(&(mem->more));
} while (err == -1 && errno == EINTR);
if (err)
return errno;
/* Exclusive access to the structure. */
do {
err = sem_wait(&(mem->lock));
} while (err == -1 && errno == EINTR);
/* Need to stop consuming? */
if (mem->state & STOP_CONSUMING) {
/* Ensure all consumers see the state immediately */
sem_post(&(mem->more));
sem_post(&(mem->lock));
/* ENOMSG == please stop. */
return errno = ENOMSG;
}
/* Copy item to caller storage. */
*into = mem->item[mem->next_item];
/* Update queue state. */
mem->next_item = (mem->next_item + 1) % MAX_ITEMS;
mem->num_items--;
/* Account for the newly freed slot. */
sem_post(&(mem->room));
/* Done. */
sem_post(&(mem->lock));
return 0;
}
static inline int shared_put(struct shared *const mem, const item_type *const from)
int err;
if (!mem || !into)
return errno = EINVAL; /* Set errno = EINVAL, and return EINVAL. */
/* Wait for room in the buffer. */
do {
err = sem_wait(&(mem->room));
} while (err == -1 && errno == EINTR);
if (err)
return errno;
/* Exclusive access to the structure. */
do {
err = sem_wait(&(mem->lock));
} while (err == -1 && errno == EINTR);
/* Time to stop? */
if (mem->state & STOP_PRODUCING) {
/* Ensure all producers see the state immediately */
sem_post(&(mem->lock));
sem_post(&(mem->room));
/* ENOMSG == please stop. */
return errno = ENOMSG;
}
/* Copy item to queue. */
mem->item[(mem->next_item + mem->num_items) % MAX_ITEMS] = *from;
/* Update queue state. */
mem->num_items++;
/* Account for the newly filled slot. */
sem_post(&(mem->more));
/* Done. */
sem_post(&(mem->lock));
return 0;
}
which return ENOMSG
给调用者如果调用者应该停止。当状态改变时,当然应该持有lock
。添加 STOP_PRODUCING
时,还应该在 room
信号量上 post (一次)以启动“级联”,以便所有生产者停止;并且在 more
信号量上添加 STOP_CONSUMING
、post 时(一次)启动消费者停止级联。 (他们每个人都会post再上一次,以确保每个producer/consumer尽快看到状态。)
不过还有其他方案;例如信号(设置 volatile sig_atomic_t
标志),但通常很难确保没有竞争 windows:进程在更改标志之前检查标志,然后阻塞信号量。
在这个方案中,最好同时验证MAX_ITEMS + NUM_PRODUCERS <= SEM_VALUE_MAX
和MAX_ITEMS + NUM_CONSUMERS <= SEM_VALUE_MAX
,这样即使在停止级联期间,信号量值也不会溢出。