使用共享内存(mmap)和信号量的进程间通信
inter process communication using shared memory(mmap) and semaphores
我正在尝试使用计数信号量即兴创作我为单生产者多消费者多线程编写的程序。我想使用共享内存(mmap() 系统调用)实现进程间通信。我想使用没有支持文件的匿名映射。
这是我想在父进程及其多个子进程之间共享的结构。
typedef struct Buffer{
char **Tuples;
sem_t buf_mutex,empty_count,fill_count;
} Buffer;
Buffer buffer[100];
父进程是 mapper()
函数,它根据一些输入产生一些东西并将其放入缓冲区 [i]。子进程转到 reducer()
函数,该函数使用缓冲区 [j] 中的内容。每个 reducer 或子进程都应该有权访问其缓冲区。子进程在main函数中forked()然后父进程控制到mapper()
。我已将同步原语初始化为进程共享。
我的 main()
方法是否正确?我还遇到 mmap()
的 return 值的类型转换错误,这是一个指针,但我不确定如何处理它然后使用它。我还认为 malloc()
不应在第 47 行中用于将 space 分配给元组,而应使用 mmap()
本身。有人可以帮忙吗?
这是我的程序 -
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <sys/mman.h>
typedef struct Buffer{
char **Tuples;
// int count;
sem_t buf_mutex,empty_count,fill_count;
} Buffer;
Buffer buffer[100];
int numOfSlots;
int numOfReducers;
void mapper(){
//Synchronization primitives (counting semaphores) used for synchronization
//Produce something and put it in buffer[i]
}
void reducer(long b){
//Synchronization primitives (counting semaphores) for synchronization
//Consume from buffer[b]
}
int main (int argc, char *argv[]){
if(argc != 3) {
if(argc < 3)
printf("Insufficient arguments passed\n");
else
printf("Too many arguments passed\n");
return 1;
}
int i;
long r;
numOfSlots = atoi(argv[1]);
numOfReducers = atoi(argv[2]);
for(i=0; i<numOfReducers; i++){
buffer[i] = (struct Buffer *) mmap(NULL, sizeof(buffer[i]), PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if (buffer[i] == MAP_FAILED)
errExit("mmap");
buffer[i].Tuples = malloc(numOfSlots * sizeof(char *));
sem_init(&buffer[i].buf_mutex, 1, 1);
sem_init(&buffer[i].fill_count, 1, 0);
sem_init(&buffer[i].empty_count, 1, numOfSlots);
}
for(r=0;r<numOfReducers;r++){ // loop will run n times (n=5)
if(fork() == 0){
printf("[son] pid %d from [parent] pid %d\n",getpid(),getppid());
Reducer(r);
exit(0);
}
}
mapper();
for(r=0;r<numOfReducers;r++) // loop will run n times (n=5)
wait(NULL);
}
这些是我尝试访问的链接 -
https://computing.llnl.gov/tutorials/pthreads/man/pthread_mutexattr_init.txt
https://github.com/bradfa/tlpi-dist/blob/master/mmap/anon_mmap.c
谢谢,
哈里什
网上查了下。我想出了这个有效的解决方案。
我了解到的另一件事是 char *var="hello"
存储在文本段的只读内存中,这意味着子进程也可以访问它。所以 strcpy()
是其他任何事情的更好选择。
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <string.h>
typedef struct Buffer{
char **Tuples;
// int count;
sem_t buf_mutex,empty_count,fill_count;
int inSlotIndex;
int outSlotIndex;
} Buffer;
Buffer *buffer;
int numOfSlots;
int numOfReducers;
int *done;
void mapper(){
//Synchronization primitives (counting semaphores) used for synchronization
//read continuously from a file, produce something and put it in buffer[i]
//Here is an example
char *temp = "trail";
sem_wait(&buffer[1].empty_count);
sem_wait(&buffer[1].buf_mutex);
// Use strcpy() for anything other than string literal
buffer[1].Tuples[0] = temp;
buffer[1].inSlotIndex = 3;
buffer[1].outSlotIndex = 4;
sem_post(&buffer[1].buf_mutex);
sem_post(&buffer[1].fill_count);
*done = 1;
}
void reducer(long tid, Buffer *buffer, int *done){
//Synchronization primitives (counting semaphores) used for synchronization
//Consume from buffer[b]
sem_wait(&buffer[tid].fill_count);
sem_wait(&buffer[tid].buf_mutex);
printf("%s\n", buffer[tid].Tuples[0]);
printf("%d\n", buffer[tid].inSlotIndex);
printf("%d\n", buffer[tid].outSlotIndex);
sem_post(&buffer[tid].buf_mutex);
sem_post(&buffer[tid].empty_count);
if(*done == 1)
printf("DONE\n");
}
int main (int argc, char *argv[])
{
if(argc != 3) {
if(argc < 3)
printf("Insufficient arguments passed\n");
else
printf("Too many arguments passed\n");
return 1;
}
srand(time(NULL));
int i, j;
long r;
char *temp;
numOfSlots = atoi(argv[1]);
numOfReducers = atoi(argv[2]);
buffer = (struct Buffer *)mmap(NULL, numOfReducers * sizeof(struct Buffer), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if(buffer == MAP_FAILED){
printf("EXITING");
exit(EXIT_FAILURE);
}
done = (int *)mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if (done == MAP_FAILED){
printf("exiting\n");
}
*done = 0;
for(i=0; i<numOfReducers; i++){
buffer[i].Tuples = mmap(NULL, numOfSlots * sizeof(char *), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if(buffer[i].Tuples == MAP_FAILED){
printf("EXITING");
exit(EXIT_FAILURE);
}
for(j=0; j<numOfSlots; j++){
temp = (char *)mmap(NULL, 30 * sizeof(char), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
temp = strcpy(temp, "");
buffer[i].Tuples[j] = temp;
}
sem_init(&buffer[i].buf_mutex, 1, 1);
sem_init(&buffer[i].fill_count, 1, 0);
sem_init(&buffer[i].empty_count, 1, numOfSlots);
}
for(r=0;r<numOfReducers;r++){ // loop will run n times (n=5)
if(fork() == 0){
printf("[son] pid %d from [parent] pid %d\n",getpid(),getppid());
reducer(r, buffer, done);
exit(0);
}
}
mapper();
for(r=0;r<numOfReducers;r++) // loop will run n times (n=5)
wait(NULL);
}
我正在尝试使用计数信号量即兴创作我为单生产者多消费者多线程编写的程序。我想使用共享内存(mmap() 系统调用)实现进程间通信。我想使用没有支持文件的匿名映射。
这是我想在父进程及其多个子进程之间共享的结构。
typedef struct Buffer{
char **Tuples;
sem_t buf_mutex,empty_count,fill_count;
} Buffer;
Buffer buffer[100];
父进程是 mapper()
函数,它根据一些输入产生一些东西并将其放入缓冲区 [i]。子进程转到 reducer()
函数,该函数使用缓冲区 [j] 中的内容。每个 reducer 或子进程都应该有权访问其缓冲区。子进程在main函数中forked()然后父进程控制到mapper()
。我已将同步原语初始化为进程共享。
我的 main()
方法是否正确?我还遇到 mmap()
的 return 值的类型转换错误,这是一个指针,但我不确定如何处理它然后使用它。我还认为 malloc()
不应在第 47 行中用于将 space 分配给元组,而应使用 mmap()
本身。有人可以帮忙吗?
这是我的程序 -
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <sys/mman.h>
typedef struct Buffer{
char **Tuples;
// int count;
sem_t buf_mutex,empty_count,fill_count;
} Buffer;
Buffer buffer[100];
int numOfSlots;
int numOfReducers;
void mapper(){
//Synchronization primitives (counting semaphores) used for synchronization
//Produce something and put it in buffer[i]
}
void reducer(long b){
//Synchronization primitives (counting semaphores) for synchronization
//Consume from buffer[b]
}
int main (int argc, char *argv[]){
if(argc != 3) {
if(argc < 3)
printf("Insufficient arguments passed\n");
else
printf("Too many arguments passed\n");
return 1;
}
int i;
long r;
numOfSlots = atoi(argv[1]);
numOfReducers = atoi(argv[2]);
for(i=0; i<numOfReducers; i++){
buffer[i] = (struct Buffer *) mmap(NULL, sizeof(buffer[i]), PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if (buffer[i] == MAP_FAILED)
errExit("mmap");
buffer[i].Tuples = malloc(numOfSlots * sizeof(char *));
sem_init(&buffer[i].buf_mutex, 1, 1);
sem_init(&buffer[i].fill_count, 1, 0);
sem_init(&buffer[i].empty_count, 1, numOfSlots);
}
for(r=0;r<numOfReducers;r++){ // loop will run n times (n=5)
if(fork() == 0){
printf("[son] pid %d from [parent] pid %d\n",getpid(),getppid());
Reducer(r);
exit(0);
}
}
mapper();
for(r=0;r<numOfReducers;r++) // loop will run n times (n=5)
wait(NULL);
}
这些是我尝试访问的链接 - https://computing.llnl.gov/tutorials/pthreads/man/pthread_mutexattr_init.txt https://github.com/bradfa/tlpi-dist/blob/master/mmap/anon_mmap.c
谢谢, 哈里什
网上查了下。我想出了这个有效的解决方案。
我了解到的另一件事是 char *var="hello"
存储在文本段的只读内存中,这意味着子进程也可以访问它。所以 strcpy()
是其他任何事情的更好选择。
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <string.h>
typedef struct Buffer{
char **Tuples;
// int count;
sem_t buf_mutex,empty_count,fill_count;
int inSlotIndex;
int outSlotIndex;
} Buffer;
Buffer *buffer;
int numOfSlots;
int numOfReducers;
int *done;
void mapper(){
//Synchronization primitives (counting semaphores) used for synchronization
//read continuously from a file, produce something and put it in buffer[i]
//Here is an example
char *temp = "trail";
sem_wait(&buffer[1].empty_count);
sem_wait(&buffer[1].buf_mutex);
// Use strcpy() for anything other than string literal
buffer[1].Tuples[0] = temp;
buffer[1].inSlotIndex = 3;
buffer[1].outSlotIndex = 4;
sem_post(&buffer[1].buf_mutex);
sem_post(&buffer[1].fill_count);
*done = 1;
}
void reducer(long tid, Buffer *buffer, int *done){
//Synchronization primitives (counting semaphores) used for synchronization
//Consume from buffer[b]
sem_wait(&buffer[tid].fill_count);
sem_wait(&buffer[tid].buf_mutex);
printf("%s\n", buffer[tid].Tuples[0]);
printf("%d\n", buffer[tid].inSlotIndex);
printf("%d\n", buffer[tid].outSlotIndex);
sem_post(&buffer[tid].buf_mutex);
sem_post(&buffer[tid].empty_count);
if(*done == 1)
printf("DONE\n");
}
int main (int argc, char *argv[])
{
if(argc != 3) {
if(argc < 3)
printf("Insufficient arguments passed\n");
else
printf("Too many arguments passed\n");
return 1;
}
srand(time(NULL));
int i, j;
long r;
char *temp;
numOfSlots = atoi(argv[1]);
numOfReducers = atoi(argv[2]);
buffer = (struct Buffer *)mmap(NULL, numOfReducers * sizeof(struct Buffer), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if(buffer == MAP_FAILED){
printf("EXITING");
exit(EXIT_FAILURE);
}
done = (int *)mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if (done == MAP_FAILED){
printf("exiting\n");
}
*done = 0;
for(i=0; i<numOfReducers; i++){
buffer[i].Tuples = mmap(NULL, numOfSlots * sizeof(char *), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if(buffer[i].Tuples == MAP_FAILED){
printf("EXITING");
exit(EXIT_FAILURE);
}
for(j=0; j<numOfSlots; j++){
temp = (char *)mmap(NULL, 30 * sizeof(char), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
temp = strcpy(temp, "");
buffer[i].Tuples[j] = temp;
}
sem_init(&buffer[i].buf_mutex, 1, 1);
sem_init(&buffer[i].fill_count, 1, 0);
sem_init(&buffer[i].empty_count, 1, numOfSlots);
}
for(r=0;r<numOfReducers;r++){ // loop will run n times (n=5)
if(fork() == 0){
printf("[son] pid %d from [parent] pid %d\n",getpid(),getppid());
reducer(r, buffer, done);
exit(0);
}
}
mapper();
for(r=0;r<numOfReducers;r++) // loop will run n times (n=5)
wait(NULL);
}