使用命名信号量的死锁
Deadlock using named semaphore
我正在尝试我的第一个 运行 使用共享内存和命名信号量来同步对它的访问。
我的程序有 3 个进程 - Parent 一个和两个 child,都必须使用相同的进程
共享内存。为了在它们之间同步,我使用了命名的 sempahore。
它们共享的资源是一个 3 整数数组,当 array[0] - 退出标志位置 时,child 进程在它们操作之前读取它以确定是否 parent 进程想要退出。
array[1], array[2] - 用于与 parent process 通信,每个进程在自己的数组单元格中放置一条消息,并且 parent进程读取它,并在响应中放置一个 ACK 消息。
我正在尝试获得我的代码的基本工作流程 - 获取所有必要的资源,让 parent 休眠 3 秒,然后启动 exit_procedure。
我的问题是,当到达 exit_procedure 时,主进程永远阻塞在 sem_wait() 操作 - apparently 死锁。
我试图找出问题所在,但似乎无法确定。
我是处理同步的新手 - 在此代码之前我只同步了线程。
更新:
我已经切换到使用 POSIX 内存映射,现在我遇到了同样的死锁问题。 process_api.h 中的相关方法无法获得锁,它们只会永远阻塞。我不知道我做错了什么。
有人可以帮忙吗?
我的代码:
主文件:
int *shmem_p; //!< Shared variable to be used across different proccesses
int shmem_fd; //!< Shared memory id
sem_t *sem_p; //!< Sempahore for syncronizing access to shared memory
volatile sig_atomic_t done; //!< An atomic flag to signal this process threads they are done
volatile sig_atomic_t signal_rcvd; //!< Indication to exit procedure if a signal caused termination
/**
* @brief Exit procedure to be called when program is done
*/
static void exit_procedure()
{
block_all_signals(); /* Block all signals - we're already existing */
if(signal_rcvd == SIGTERM) { /* SIGTERM is manually raised by us when a thread terminates, thus not handled in signal handler */
write(STDERR_FILENO, "Error occured - thread terminated\n", 33);
}
if( !signal_rcvd ) { /* We got here normally, or by thread termination - set done flag */
done = true;
}
/* Free all relevant resources */
sem_unlink("/shmemory");
sem_close(sem_p);
munmap(shmem_p, TOTAL_PROC_NUM*sizeof(int));
shm_unlink("/shmemory");
sem_p = NULL;
shmem_p = NULL;
}
static void signal_handler(int sig_num) {
switch(sig_num) {
case SIGCHLD:
write(STDERR_FILENO, "Error occured - Child process terminated\n", 43);
break;
case SIGALRM:
write(STDOUT_FILENO, "Successfully finished sim\n", 28);
break;
default:
fprintf(stderr, "Error - Signal %s has been raised", strsignal(sig_num));
fflush(stderr);
break;
}
done = true;
signal_rcvd = true;
}
static status_t init_procedure()
{
done = false;
signal_rcvd = false;
size_t size = TOTAL_PROC_NUM*sizeof(int);
/* Initialize shared memory to be used as an exit flag to be used by all processes */
shmem_fd = shm_open("/shmemory", O_CREAT | O_TRUNC | O_RDWR, 0644);
if(shmem_fd < 0) {
error_and_exit("shm_open() failed, err = ", errno);
}
if(ftruncate(shmem_fd, size)) {
shm_unlink("/shmemory");
error_and_exit("ftruncate() failed, err = ", errno);
}
shmem_p = (int *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, shmem_fd, 0);
if(shmem_p == MAP_FAILED) {
shm_unlink("/shmemory");
error_and_exit("mmap() failed, err = ", errno);
}
close(shmem_fd); /* No longer needed */
memset(shmem_p, 0, size);
/* Initialize a named sempahore for the procceses shared memory */
sem_p = sem_open("/shsemaphore", O_CREAT | O_RDWR, 0644, 1);
if(SEM_FAILED == sem_p) {
error("sem_open() failed, err = ", errno);
munmap(shmem_p, size);
shm_unlink("/shmemory");
}
/* Initialize memory access invokers processes */
if(processes_init() != SUCCESS) {
error("init_processes() failed\n", ERR);
munmap(shmem_p, size);
shm_unlink("/shmemory");
sem_close(sem_p);
return FAILURE;
}
/* Handle Signals - Ignore SIGINT, SIGQUIT, handle SIGCHLD & SIGALRM */
struct sigaction sig_handler;
sig_handler.sa_flags = 0;
if(sigfillset(&sig_handler.sa_mask)) { /* Mask all other signals inside the handler */
error("sigemptyset() failed, err = ", errno);
exit_procedure();
return FAILURE;
}
sig_handler.sa_handler = signal_handler;
if(sigaction(SIGCHLD, &sig_handler, NULL) || sigaction(SIGALRM, &sig_handler, NULL)) { /* Set the signal handler for SIGCHLD & SIGALRM */
error("sigaction() failed, err = ", errno);
exit_procedure();
return FAILURE;
}
sig_handler.sa_handler = SIG_IGN;
if(sigaction(SIGINT, &sig_handler, NULL) || sigaction(SIGQUIT, &sig_handler, NULL)) { /* Ignore ctrl+c and ctrl+z */
error("sigaction() failed, err = ", errno);
exit_procedure();
return FAILURE;
}
return SUCCESS;
}
int main(int argc, char *argv[])
{
if(argc != 1) {
fprintf(stderr, "usage: %s (no arguments allowed)\n", argv[0]);
exit(EXIT_FAILURE);
}
if(SUCCESS != init_procedure()) {
error_and_exit("init_procedure() failed\n", ERR);
}
sleep(5);
exit_procedure();
return EXIT_SUCCESS;
}
进程处理程序:
#define WR_RATE (0.8) //!< Writing probabilty when invoking memory access
#define WR_RATE_INT (WR_RATE*10) //!< WR_RATE as an int value between 1 and 10
#define INTER_MEM_ACCS_T (100000) //!< Waiting time between memory accesses
static pid_t child_pids[CHILD_PROC_NUM];
int process_cnt; //!< Determines the index of the process, for child processes
extern sem_t *sem_p;
static bool is_ack_received(int *mem_p, off_t size)
{
bool ack;
/*********************************************************/
/** Critical Section start **/
if((sem_wait(sem_p) != 0) && (errno != EINTR)) {
munmap(mem_p, size);
shm_unlink("/shmemory");
error_and_Exit("sem_wait() failed, err = ", errno);
}
ack = (mem_p[process_cnt] == MSG_ACK);
if(ack) {// TODO - Remove
fprintf(stdout, "Process %d received ACK\n", process_cnt);
fflush(stdout);
}
if((sem_post(sem_p) != 0) && (errno != EINTR)) {
munmap(mem_p, size);
shm_unlink("/shmemory");
error_and_Exit("sem_post() failed, err = ", errno);
}
/** Critical Section end **/
/*********************************************************/
return ack;
}
static void invoke_memory_access(int *mem_p, off_t size)
{
msg_e send_msg = MSG_READ;
if(rand_range(1, 10) <= WR_RATE_INT) { /* Write Memory */
send_msg = MSG_WRITE;
}
/*********************************************************/
/** Critical Section start **/
if((sem_wait(sem_p) != 0) && (errno != EINTR)) {
munmap(mem_p, size);
shm_unlink("/shmemory");
error_and_Exit("sem_wait() failed, err = ", errno);
}
mem_p[process_cnt] = send_msg;
fprintf(stdout, "Process %d sent MSG_%d in mem_address: %p\n", process_cnt, send_msg, &mem_p[process_cnt]); // TODO - Remove
fflush(stdout);
if((sem_post(sem_p) != 0) && (errno != EINTR)) {
munmap(mem_p, size);
shm_unlink("/shmemory");
error_and_Exit("sem_post() failed, err = ", errno);
}
/** Critical Section end **/
/*********************************************************/
}
static void main_loop()
{
int shmem_fd = shm_open("/shmemory", O_RDWR, 0);
if(shmem_fd < 0) {
error_and_Exit("shm_open() failed, err = ", errno);
}
struct stat mem_stat;
fstat(shmem_fd, &mem_stat);
int *child_memory_p = (int *)mmap(NULL, mem_stat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shmem_fd, 0);
if(child_memory_p == MAP_FAILED) {
shm_unlink("/shmemory");
error_and_Exit("mmap() failed, err = ", errno);
}
close(shmem_fd); /* No longer needed */
bool first_run = true;
bool ack = false;
const struct timespec ns_wait = {.tv_sec = 0, .tv_nsec = INTER_MEM_ACCS_T};
while(child_memory_p[0] != MSG_EXIT) {
if( !first_run ) { /* Not the first run, check for ack */
ack = is_ack_received(child_memory_p, mem_stat.st_size);
}
nanosleep(&ns_wait, NULL);
if( !first_run && !ack ) { /* No ack received for latest call, nothing to be done */
continue;
}
invoke_memory_access(child_memory_p, mem_stat.st_size);
if(first_run) { /* First run is over.. */
first_run = false;
}
}
fprintf(stdout, "PROCCESS %d EXIT!\n", process_cnt); // TODO Remove this
fflush(stdout);
munmap(child_memory_p, mem_stat.st_size);
shm_unlink("/shmemory");
child_memory_p = NULL;
_Exit(EXIT_SUCCESS);
}
status_t processes_init()
{
pid_t pid;
process_cnt = 1; /* Will be used for child processes to determine their order creation */
int i;
for(i = 0; i < CHILD_PROC_NUM; ++i) {
pid = fork();
if(ERR == pid) {
error("fork() failed, err = ", errno);
return FAILURE;
} else if(pid != 0) { /* Parent process */
child_pids[i] = pid;
process_cnt++;
} else { /* Child process */
block_all_signals(); /* Only main process responsible for indicate exit to its child*/
main_loop();
}
}
return SUCCESS;
}
void processes_deinit(int **mem_p)
{
(*mem_p)[0] = MSG_EXIT;
fprintf(stdout, "EXIT wrriten to address %p\n", *mem_p);
/* Wait for all child processes to terminate */
int i;
write(STDOUT_FILENO, "Waiting for children to exit\n", 29); // TODO Remove this
for(i = 0; i < CHILD_PROC_NUM; ++i) {
if((ERR == waitpid(child_pids[i], NULL, 0)) && (ECHILD != errno)) {
error("waitpid() failed, err = ", errno);
}
}
fprintf(stdout, "PROCCESS DEINIT DONE!\n"); // TODO Remove this
fflush(stdout);
}
谁能解释一下我做错了什么?
我试过了:
从主进程传递 sem_t 指针作为 *sem_t **semaphore_p* 到 processes_init 方法,并让每个 child 使用指向信号量的真实指针(即使 child 将指针复制到 COW 机制上,他仍将使用实际地址
谢谢
在 进程处理程序中将 sem_t 指针声明为外部指针
打开每个 child 进程(在 main_loop 方法中)一个 "copy" 的命名信号量使用 sem_open("/shsemaphore", O_RDWR)
None 其中有效。伙计们,我要疯了,请帮助我:(
函数init_procedure()中打开信号量后,有一个函数调用
sem_unlink(shared_sem_name);
始终执行,创建后立即销毁信号量。您希望在错误处理块中使用这一行。
找到解决方案:
在主文件中创建命名信号量时,权限设置为 0644,
给进程组只读权限。
更改为以下内容后:
sem_open(semaphore_name, O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP, 1)
问题好像解决了!
显然,如果在没有 read\write 信号量权限的情况下调用 sem_wait (这发生在子进程中 - 他们仅使用具有 READ 权限的信号量)行为是未定义的
我正在尝试我的第一个 运行 使用共享内存和命名信号量来同步对它的访问。
我的程序有 3 个进程 - Parent 一个和两个 child,都必须使用相同的进程 共享内存。为了在它们之间同步,我使用了命名的 sempahore。 它们共享的资源是一个 3 整数数组,当 array[0] - 退出标志位置 时,child 进程在它们操作之前读取它以确定是否 parent 进程想要退出。 array[1], array[2] - 用于与 parent process 通信,每个进程在自己的数组单元格中放置一条消息,并且 parent进程读取它,并在响应中放置一个 ACK 消息。
我正在尝试获得我的代码的基本工作流程 - 获取所有必要的资源,让 parent 休眠 3 秒,然后启动 exit_procedure。
我的问题是,当到达 exit_procedure 时,主进程永远阻塞在 sem_wait() 操作 - apparently 死锁。 我试图找出问题所在,但似乎无法确定。 我是处理同步的新手 - 在此代码之前我只同步了线程。
更新: 我已经切换到使用 POSIX 内存映射,现在我遇到了同样的死锁问题。 process_api.h 中的相关方法无法获得锁,它们只会永远阻塞。我不知道我做错了什么。 有人可以帮忙吗?
我的代码:
主文件:
int *shmem_p; //!< Shared variable to be used across different proccesses
int shmem_fd; //!< Shared memory id
sem_t *sem_p; //!< Sempahore for syncronizing access to shared memory
volatile sig_atomic_t done; //!< An atomic flag to signal this process threads they are done
volatile sig_atomic_t signal_rcvd; //!< Indication to exit procedure if a signal caused termination
/**
* @brief Exit procedure to be called when program is done
*/
static void exit_procedure()
{
block_all_signals(); /* Block all signals - we're already existing */
if(signal_rcvd == SIGTERM) { /* SIGTERM is manually raised by us when a thread terminates, thus not handled in signal handler */
write(STDERR_FILENO, "Error occured - thread terminated\n", 33);
}
if( !signal_rcvd ) { /* We got here normally, or by thread termination - set done flag */
done = true;
}
/* Free all relevant resources */
sem_unlink("/shmemory");
sem_close(sem_p);
munmap(shmem_p, TOTAL_PROC_NUM*sizeof(int));
shm_unlink("/shmemory");
sem_p = NULL;
shmem_p = NULL;
}
static void signal_handler(int sig_num) {
switch(sig_num) {
case SIGCHLD:
write(STDERR_FILENO, "Error occured - Child process terminated\n", 43);
break;
case SIGALRM:
write(STDOUT_FILENO, "Successfully finished sim\n", 28);
break;
default:
fprintf(stderr, "Error - Signal %s has been raised", strsignal(sig_num));
fflush(stderr);
break;
}
done = true;
signal_rcvd = true;
}
static status_t init_procedure()
{
done = false;
signal_rcvd = false;
size_t size = TOTAL_PROC_NUM*sizeof(int);
/* Initialize shared memory to be used as an exit flag to be used by all processes */
shmem_fd = shm_open("/shmemory", O_CREAT | O_TRUNC | O_RDWR, 0644);
if(shmem_fd < 0) {
error_and_exit("shm_open() failed, err = ", errno);
}
if(ftruncate(shmem_fd, size)) {
shm_unlink("/shmemory");
error_and_exit("ftruncate() failed, err = ", errno);
}
shmem_p = (int *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, shmem_fd, 0);
if(shmem_p == MAP_FAILED) {
shm_unlink("/shmemory");
error_and_exit("mmap() failed, err = ", errno);
}
close(shmem_fd); /* No longer needed */
memset(shmem_p, 0, size);
/* Initialize a named sempahore for the procceses shared memory */
sem_p = sem_open("/shsemaphore", O_CREAT | O_RDWR, 0644, 1);
if(SEM_FAILED == sem_p) {
error("sem_open() failed, err = ", errno);
munmap(shmem_p, size);
shm_unlink("/shmemory");
}
/* Initialize memory access invokers processes */
if(processes_init() != SUCCESS) {
error("init_processes() failed\n", ERR);
munmap(shmem_p, size);
shm_unlink("/shmemory");
sem_close(sem_p);
return FAILURE;
}
/* Handle Signals - Ignore SIGINT, SIGQUIT, handle SIGCHLD & SIGALRM */
struct sigaction sig_handler;
sig_handler.sa_flags = 0;
if(sigfillset(&sig_handler.sa_mask)) { /* Mask all other signals inside the handler */
error("sigemptyset() failed, err = ", errno);
exit_procedure();
return FAILURE;
}
sig_handler.sa_handler = signal_handler;
if(sigaction(SIGCHLD, &sig_handler, NULL) || sigaction(SIGALRM, &sig_handler, NULL)) { /* Set the signal handler for SIGCHLD & SIGALRM */
error("sigaction() failed, err = ", errno);
exit_procedure();
return FAILURE;
}
sig_handler.sa_handler = SIG_IGN;
if(sigaction(SIGINT, &sig_handler, NULL) || sigaction(SIGQUIT, &sig_handler, NULL)) { /* Ignore ctrl+c and ctrl+z */
error("sigaction() failed, err = ", errno);
exit_procedure();
return FAILURE;
}
return SUCCESS;
}
int main(int argc, char *argv[])
{
if(argc != 1) {
fprintf(stderr, "usage: %s (no arguments allowed)\n", argv[0]);
exit(EXIT_FAILURE);
}
if(SUCCESS != init_procedure()) {
error_and_exit("init_procedure() failed\n", ERR);
}
sleep(5);
exit_procedure();
return EXIT_SUCCESS;
}
进程处理程序:
#define WR_RATE (0.8) //!< Writing probabilty when invoking memory access
#define WR_RATE_INT (WR_RATE*10) //!< WR_RATE as an int value between 1 and 10
#define INTER_MEM_ACCS_T (100000) //!< Waiting time between memory accesses
static pid_t child_pids[CHILD_PROC_NUM];
int process_cnt; //!< Determines the index of the process, for child processes
extern sem_t *sem_p;
static bool is_ack_received(int *mem_p, off_t size)
{
bool ack;
/*********************************************************/
/** Critical Section start **/
if((sem_wait(sem_p) != 0) && (errno != EINTR)) {
munmap(mem_p, size);
shm_unlink("/shmemory");
error_and_Exit("sem_wait() failed, err = ", errno);
}
ack = (mem_p[process_cnt] == MSG_ACK);
if(ack) {// TODO - Remove
fprintf(stdout, "Process %d received ACK\n", process_cnt);
fflush(stdout);
}
if((sem_post(sem_p) != 0) && (errno != EINTR)) {
munmap(mem_p, size);
shm_unlink("/shmemory");
error_and_Exit("sem_post() failed, err = ", errno);
}
/** Critical Section end **/
/*********************************************************/
return ack;
}
static void invoke_memory_access(int *mem_p, off_t size)
{
msg_e send_msg = MSG_READ;
if(rand_range(1, 10) <= WR_RATE_INT) { /* Write Memory */
send_msg = MSG_WRITE;
}
/*********************************************************/
/** Critical Section start **/
if((sem_wait(sem_p) != 0) && (errno != EINTR)) {
munmap(mem_p, size);
shm_unlink("/shmemory");
error_and_Exit("sem_wait() failed, err = ", errno);
}
mem_p[process_cnt] = send_msg;
fprintf(stdout, "Process %d sent MSG_%d in mem_address: %p\n", process_cnt, send_msg, &mem_p[process_cnt]); // TODO - Remove
fflush(stdout);
if((sem_post(sem_p) != 0) && (errno != EINTR)) {
munmap(mem_p, size);
shm_unlink("/shmemory");
error_and_Exit("sem_post() failed, err = ", errno);
}
/** Critical Section end **/
/*********************************************************/
}
static void main_loop()
{
int shmem_fd = shm_open("/shmemory", O_RDWR, 0);
if(shmem_fd < 0) {
error_and_Exit("shm_open() failed, err = ", errno);
}
struct stat mem_stat;
fstat(shmem_fd, &mem_stat);
int *child_memory_p = (int *)mmap(NULL, mem_stat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shmem_fd, 0);
if(child_memory_p == MAP_FAILED) {
shm_unlink("/shmemory");
error_and_Exit("mmap() failed, err = ", errno);
}
close(shmem_fd); /* No longer needed */
bool first_run = true;
bool ack = false;
const struct timespec ns_wait = {.tv_sec = 0, .tv_nsec = INTER_MEM_ACCS_T};
while(child_memory_p[0] != MSG_EXIT) {
if( !first_run ) { /* Not the first run, check for ack */
ack = is_ack_received(child_memory_p, mem_stat.st_size);
}
nanosleep(&ns_wait, NULL);
if( !first_run && !ack ) { /* No ack received for latest call, nothing to be done */
continue;
}
invoke_memory_access(child_memory_p, mem_stat.st_size);
if(first_run) { /* First run is over.. */
first_run = false;
}
}
fprintf(stdout, "PROCCESS %d EXIT!\n", process_cnt); // TODO Remove this
fflush(stdout);
munmap(child_memory_p, mem_stat.st_size);
shm_unlink("/shmemory");
child_memory_p = NULL;
_Exit(EXIT_SUCCESS);
}
status_t processes_init()
{
pid_t pid;
process_cnt = 1; /* Will be used for child processes to determine their order creation */
int i;
for(i = 0; i < CHILD_PROC_NUM; ++i) {
pid = fork();
if(ERR == pid) {
error("fork() failed, err = ", errno);
return FAILURE;
} else if(pid != 0) { /* Parent process */
child_pids[i] = pid;
process_cnt++;
} else { /* Child process */
block_all_signals(); /* Only main process responsible for indicate exit to its child*/
main_loop();
}
}
return SUCCESS;
}
void processes_deinit(int **mem_p)
{
(*mem_p)[0] = MSG_EXIT;
fprintf(stdout, "EXIT wrriten to address %p\n", *mem_p);
/* Wait for all child processes to terminate */
int i;
write(STDOUT_FILENO, "Waiting for children to exit\n", 29); // TODO Remove this
for(i = 0; i < CHILD_PROC_NUM; ++i) {
if((ERR == waitpid(child_pids[i], NULL, 0)) && (ECHILD != errno)) {
error("waitpid() failed, err = ", errno);
}
}
fprintf(stdout, "PROCCESS DEINIT DONE!\n"); // TODO Remove this
fflush(stdout);
}
谁能解释一下我做错了什么?
我试过了:
从主进程传递 sem_t 指针作为 *sem_t **semaphore_p* 到 processes_init 方法,并让每个 child 使用指向信号量的真实指针(即使 child 将指针复制到 COW 机制上,他仍将使用实际地址 谢谢
在 进程处理程序中将 sem_t 指针声明为外部指针
打开每个 child 进程(在 main_loop 方法中)一个 "copy" 的命名信号量使用 sem_open("/shsemaphore", O_RDWR)
None 其中有效。伙计们,我要疯了,请帮助我:(
函数init_procedure()中打开信号量后,有一个函数调用
sem_unlink(shared_sem_name);
始终执行,创建后立即销毁信号量。您希望在错误处理块中使用这一行。
找到解决方案:
在主文件中创建命名信号量时,权限设置为 0644, 给进程组只读权限。
更改为以下内容后:
sem_open(semaphore_name, O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP, 1)
问题好像解决了! 显然,如果在没有 read\write 信号量权限的情况下调用 sem_wait (这发生在子进程中 - 他们仅使用具有 READ 权限的信号量)行为是未定义的