始终取消链接()共享内存中的 POSIX 命名信号量

Always unlink() the POSIX named semaphore in shared memory

在下面的代码中,每个线程运行 sell_tickets() 函数,获取互斥锁并减少票数。为了限制活动线程数,checkin()checkout()线程是使用信号量的生产者-消费者模型。

但是在单元测试中,似乎消费者函数 checkout() 在生产者 checkin() 甚至创建新线程之前调用了 pthread_join() 函数。所以有什么不对吗?是因为线程不共享堆栈内存段,因为我没有 malloc() 堆 space 一些参数。

/**
 * Using semaphore to limit maximum thread number.
 */
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>

/**
 * If arguments that pthread_create() pass to sell_tickets() are more than one, 
 * they need to be wrapped into a single structure.
 */
typedef struct {
    unsigned agent_id;              // simulate an agent
    unsigned tickets_tosell;        // agent's personal goal of the day
    pthread_mutex_t *pool_lock;     // mutex lock for visiting the shared tickets pool
    unsigned *tickets_pool;         // shared tickets pool
} agent;

/**
 * Constructor
 */
static void new_agent(agent *a, unsigned agentid, unsigned ticketsnum, pthread_mutex_t *lock, unsigned *tickets_pool) {
    a->agent_id = agentid;
    a->tickets_tosell = ticketsnum;
    a->pool_lock = lock;
    a->tickets_pool = tickets_pool;
}

/**
 * Implement void *(*start_rtn)(void *);
 * -------------------------------------
 * Each thread execute this function.
 */
static void *sell_tickets(void *agent_addr) {
    agent *a = (agent *)agent_addr;
    while (a->tickets_tosell > 0) {
        pthread_mutex_lock(a->pool_lock);
        (*a->tickets_pool)--;
        printf("agent@%d sells a ticket, %d tickets left in pool.\n", a->agent_id, *a->tickets_pool);
        pthread_mutex_unlock(a->pool_lock);
        a->tickets_tosell--;
        printf("agent@%d has %d tickets to sell.\n", a->agent_id, a->tickets_tosell);
    }
    pthread_exit((void *)&a->agent_id); 
}

typedef struct {
    /* shared threads pool */
    pthread_t *pool_addr;
    unsigned pool_size;
    /* a pair of producer-consumer semaphore */
    sem_t *producer_sem;
    sem_t *consumer_sem;
    /* producer & consumer thread id */
    pthread_t *producer_tid;
    pthread_t *consumer_tid;
} threads_pool;

static void new_threads_pool(threads_pool *tp, 
                        pthread_t *pool_addr, unsigned pool_size, 
                        sem_t *producer_sem, sem_t *consumer_sem, 
                        pthread_t *ptid, pthread_t *ctid) {
    tp->pool_addr = pool_addr;
    tp->pool_size = pool_size;
    tp->producer_sem = producer_sem;
    tp->consumer_sem = consumer_sem;
    tp->producer_tid = ptid;
    tp->consumer_tid = ctid;
}

typedef struct {
    sem_t *checkin_b;
    sem_t *checkout_b;
} barrier;

static void new_barrier(barrier *b, sem_t *inb, sem_t *outb) {
    b->checkin_b = inb;
    b->checkout_b = outb;   
}

typedef struct {
    unsigned num_agents;
    unsigned num_tickets;
} project;

static void new_project(project *p, unsigned num_agents, unsigned num_tickets) {
    p->num_agents = num_agents;
    p->num_tickets = num_tickets;
}

typedef struct {
    project *pj;
    threads_pool *tp;
    barrier *b;
} project_params;

static void new_project_params(project_params *pp, project *pj, threads_pool *tp, barrier *b) {
    pp->pj = pj;
    pp->tp = tp;
    pp->b = b;
}

/**
 * producer thread
 * create agent sell_tickets() threads
 */
static void *checkin(void *params) {
    project_params *pp = (project_params *)params;

    /* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
    sem_post(pp->b->checkout_b);
    sem_wait(pp->b->checkin_b);

    unsigned tickets_pool = pp->pj->num_tickets;    // shared resourses
    pthread_mutex_t tickets_pool_lock;              // mutex object

    agent agents[pp->pj->num_agents];               // arguments pass to 10 threads
    unsigned id;                                    // agent id
    pthread_t tid;                                  // current sell_tickets() thread id
    int err;                                        // thread_create() function return the error code

    pthread_mutex_init(&tickets_pool_lock, NULL);

    for (unsigned i = 0; i < pp->pj->num_agents; i++) {
        id = i + 1;
        new_agent(&agents[i], id, pp->pj->num_tickets / pp->pj->num_agents, &tickets_pool_lock, &tickets_pool);

        sem_wait(pp->tp->producer_sem);
        err = pthread_create(pp->tp->pool_addr + (i % pp->tp->pool_size), NULL, sell_tickets, &agents[i]);
        if (err != 0) {
            printf("error[%d]: can't create thread.", err);     
            pthread_cancel(*pp->tp->consumer_tid);
            pthread_exit(NULL);
        } else {
            tid = *(pp->tp->pool_addr + (i % pp->tp->pool_size));
            printf("thread of agent@%d created, thread id = @%lx\n", id, (unsigned long)tid);
            sem_post(pp->tp->consumer_sem);
        }
    }

    /* wait for checkout() thread to finish */
    pthread_join(*pp->tp->consumer_tid, NULL);

    pthread_mutex_destroy(&tickets_pool_lock);
    if (tickets_pool == 0) {
        printf("Today's tickets are sold out!\n");
    } else {
        printf("%d tickets left at the end of the day!\n", tickets_pool);
    }
    pthread_exit(NULL);
}

/**
 * consumer thread
 * wait for old agent sell_tickets() thread to exit and increment the producer semaphore
 */
static void *checkout(void *params) {
    project_params *pp = (project_params *)params;

    /* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
    sem_post(pp->b->checkin_b);
    sem_wait(pp->b->checkout_b);

    void *tret;     // to store pthread_exit() exit status rval_ptr
    int errcode;    // pthread_join() return err code

    for (unsigned i = 0; i < pp->pj->num_agents; i++) {
        sem_wait(pp->tp->consumer_sem);
        errcode = pthread_join(*(pp->tp->pool_addr + (i % pp->tp->pool_size)), &tret);
        sem_post(pp->tp->producer_sem);
        printf("agent@%d has finished his job!\n", *(unsigned *)tret);
    }
    pthread_exit(NULL);
}

/**
 * producer-consumer model using semaphore
 * maximum 10 agent sell_tickets() threads work at a time
 */
static void run(void) {
    unsigned num_agents = 30;
    unsigned num_tickets = 300;
    /*
     * project
     */
    project pj;
    new_project(&pj, num_agents, num_tickets);

    /*
     * thread pool of maximum 10 thread 
     */
    unsigned threads_pool_size = 10;
    pthread_t pool[threads_pool_size];

    /* 
     * checkin() and checkout() threads
     */
    pthread_t checkin_tid, checkout_tid;
    int checkin_err, checkout_err;
    /* a pair of producer-consumer semaphores */
    sem_t *checkin_sem, *checkout_sem;
    checkin_sem = sem_open("/checkin_sem", O_CREAT, S_IRWXG, threads_pool_size);
    checkout_sem = sem_open("/checkout_sem", O_CREAT, S_IRWXG, 0);
    /* barrier */
    barrier b;
    sem_t *checkin_b, *checkout_b;
    checkin_b = sem_open("/checkin_b", O_CREAT, S_IRWXG, 0);
    checkout_b = sem_open("/checkout_b", O_CREAT, S_IRWXG, 0);
    new_barrier(&b, checkin_b, checkout_b);
    /* wrap checkin() checkout() parameters into a single structure */
    threads_pool tp;
    new_threads_pool(&tp, &pool[0], threads_pool_size, checkin_sem, checkout_sem, &checkin_tid, &checkout_tid);
    project_params pp;
    new_project_params(&pp, &pj, &tp, &b);
    /* create checkin(), checkout() threads */
    checkin_err = pthread_create(&checkin_tid, NULL, checkin, &pp);
    checkout_err = pthread_create(&checkout_tid, NULL, checkout, &pp);
    if (checkin_err != 0 || checkout_err != 0) {
        if (checkin_err != 0) printf("error: checkin() thread creation failed!\n");
        if (checkout_err != 0) printf("error: checkout() thread creation failed!\n");
        sem_close(checkin_sem);
        sem_close(checkout_sem);
        sem_close(checkin_b);
        sem_close(checkout_b);
        return;
    }

    /*
     * wait for checkin() and checkout() to exit
     */
    pthread_join(checkin_tid, NULL);
    pthread_join(checkout_tid, NULL);

    /*
     * free resources
     */
    sem_close(checkin_sem);
    sem_close(checkout_sem);
    sem_close(checkin_b);
    sem_close(checkout_b);
}
    

int main(void) {
    run();
}

lldb 显示 thread_join() return 错误代码 3:ESRCH.

Process 46631 stopped
* thread #3, stop reason = EXC_BAD_ACCESS (code=1, address=0x0)
    frame #0: 0x0000000100003b39 agtktp`checkout(params=0x00007ffeefbff618) at agents_tickets_pool.c:221:46
   217  errcode = pthread_join(*(pp->tp->pool_addr + (i % pp->tp->pool_size)), &tret);
   218  pthread_t tid = *(pp->tp->pool_addr + (i % pp->tp->pool_size));
   219  printf("thread@%u = %lx\n", i + 1, (unsigned long)tid);
   220          sem_post(pp->tp->producer_sem);
-> 221          printf("agent@%d has finished his job!\n", *(unsigned *)tret);
   222      }
   223      pthread_exit(NULL);
   224  }
Target 0: (agtktp) stopped.
(lldb) fr v errcode
(int) errcode = 3
(lldb) fr v tid
(pthread_t) tid = 0x0000000000000050

=== 2021 年 1 月 3 日更新 2 ===

我已经发布了解决方案。如果有什么不对,请随时纠正我。谢谢大家,特别是@Semion。

=== 2021 年 1 月 3 日更新 2 结束 ===

=== 2021 年 1 月 3 日更新 1 ===

目前我已经修复了这个问题,并发现我在下面的代码中犯了一个典型的错误。

我需要一些时间来重新编辑这个问题并给出答案,以使其对以后的 POSIX Semaphore 用户有用。请不要 关闭这个问题(它已经有 2 个接近的投票)。谢谢。

=== 2021 年 1 月 3 日更新 1 结束 ===

错误现已修复。问题出在 sem_close().

根据“Unix Networking Program - Vol.2 - ch10”sem_close() 关闭信号量,但不会将其从系统共享内存中删除。 POSIX 命名信号量是“kernel-persistent”。

所以基本上我为每个 运行 使用了同一对旧信号量,称为 /checkin_semcheckout_sem。一旦程序异常退出,在信号量中留下一些奇怪的值,后续的所有测试都会崩溃。这就是为什么其他一些人说该程序在他们的机器上运行良好的原因。因为他们的系统上没有遗留信号量。

为了解决这个问题,我们应该使用sem_unlink()而不是,这样信号量名称将从系统中删除,信号量的破坏将发生直到最后 sem_close() 发生。由于关闭操作会在进程终止时自动发生,因此我们不必显式编写 sem_close()

sem_unlink("/checkin_sem");
sem_unlink("/checkout_sem");
sem_unlink("/checkin_b");
sem_unlink("/checkout_b");

并且为了防止遗留信号量影响我们的程序,最好在创建信号量时添加O_EXCL,当信号量已经存在时会弹出错误。

checkin_sem = sem_open("/checkin_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, threads_pool_size);
checkout_sem = sem_open("/checkout_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);

最后,对于权限标志,如果程序不涉及进程间通信,所有者的简单readwrite即可,即S_IREAD | S_IWRITES_IRUSR | S_IWUSR.

这是这个生产者-消费者线程池的最终版本。

/**
 * Using semaphore to limit maximum thread number.
 */
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>

/**
 * If arguments that pthread_create() pass to sell_tickets() are more than one, 
 * they need to be wrapped into a single structure.
 */
typedef struct {
    unsigned agent_id;              // simulate an agent
    unsigned tickets_tosell;        // agent's personal goal of the day
    unsigned *tickets_pool;         // shared tickets pool
    pthread_mutex_t *pool_lock;     // mutex lock for visiting the shared tickets pool
} agent;

/**
 * Constructor
 */
static void new_agent(agent *a, unsigned agentid, unsigned tickets_num, unsigned *pool, pthread_mutex_t *lock) {
    a->agent_id = agentid;
    a->tickets_tosell = tickets_num;
    a->tickets_pool = pool;
    a->pool_lock = lock;
}

/**
 * Implement void *(*start_rtn)(void *);
 * -------------------------------------
 * Each thread execute this function.
 */
static void *sell_tickets(void *agent_addr) {
    agent *a = (agent *)agent_addr;
    while (a->tickets_tosell > 0) {
        pthread_mutex_lock(a->pool_lock);   // begin of race condition
        (*a->tickets_pool)--;
        fprintf(stdout, "agent@%d sells a ticket, %d tickets left in pool.\n", a->agent_id, *a->tickets_pool);
        fflush(stdout);
        pthread_mutex_unlock(a->pool_lock); // end of race condition
        a->tickets_tosell--;
        fprintf(stdout, "agent@%d has %d tickets to sell.\n", a->agent_id, a->tickets_tosell);
        fflush(stdout);
    }
    pthread_exit((void *)&a->agent_id); 
}

/**
 * Simulate a tickets selling project
 */
struct {
    unsigned num_agents;
    unsigned num_tickets;
} project;

/**
 * Shared threads pool
 */
struct {
    unsigned pool_size;
    pthread_t *pool;
} threads_pool;

/**
 * Producer, consumer threads id
 */
struct {
    pthread_t producer;
    pthread_t consumer;
} producer_consumer;

/**
 * A pair of semaphore
 */
struct {
    sem_t *producer_sem;
    sem_t *consumer_sem;
} sem;

/**
 * Barrier is not implemented in mac os, we use a pair of semaphore instead.
 */
struct {
    sem_t *producer_b;
    sem_t *consumer_b;
} barrier;

/**
 * producer thread
 * create agent sell_tickets() threads
 */
static void *producer(void *arg) {
    /* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
    sem_post(barrier.consumer_b);
    sem_wait(barrier.producer_b);

    unsigned tickets_pool = project.num_tickets;    // shared resource for each agent
    pthread_mutex_t tickets_pool_lock;              // mutex lock for visiting tickets pool

    agent agents[project.num_agents];               // arguments pass to 10 threads
    unsigned id;                                    // agent id
    pthread_t tid;                                  // current sell_tickets() thread id
    int err;                                        // thread_create() return err code

    pthread_mutex_init(&tickets_pool_lock, NULL);

    for (unsigned i = 0; i < project.num_agents; i++) {
        id = i + 1;
        new_agent(&agents[i], id, project.num_tickets / project.num_agents, &tickets_pool, &tickets_pool_lock);

        sem_wait(sem.producer_sem);
        err = pthread_create(threads_pool.pool + i % threads_pool.pool_size, NULL, sell_tickets, &agents[i]);
        if (err != 0) {
            fprintf(stdout, "error[%d]: can't create thread.\n", err);      
            fflush(stdout);
            pthread_cancel(producer_consumer.consumer);
            pthread_exit(NULL);
        } else {
            tid = *(threads_pool.pool + i % threads_pool.pool_size);
            fprintf(stdout, "thread of agent@%d created, thread id = @%lx\n", id, (unsigned long)tid);
            fflush(stdout);
            sem_post(sem.consumer_sem);
        }
    }

    /* wait for checkout() thread to finish */
    pthread_join(producer_consumer.consumer, NULL);

    pthread_mutex_destroy(&tickets_pool_lock);
    if (tickets_pool == 0) {
        fprintf(stdout, "Today's tickets are sold out!\n");
        fflush(stdout);
    } else {
        fprintf(stdout, "%d tickets left at the end of the day!\n", tickets_pool);
        fflush(stdout);
    }
    pthread_exit(NULL);
}

/**
 * consumer thread
 * wait for old agent sell_tickets() thread to exit and increment the producer semaphore
 */
static void *consumer(void *arg) {
    /* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
    sem_post(barrier.producer_b);
    sem_wait(barrier.consumer_b);

    void *tret;     // to store pthread_exit() exit status rval_ptr

    for (unsigned i = 0; i < project.num_agents; i++) {
        sem_wait(sem.consumer_sem);
        pthread_join(*(threads_pool.pool + i % threads_pool.pool_size), &tret);
        sem_post(sem.producer_sem);
        fprintf(stdout, "agent@%d has finished his job!\n", *(unsigned *)tret);
        fflush(stdout);
    }
    pthread_exit(NULL);
}

/**
 * maximum 10 agent sell_tickets() threads work at a time
 */
static void run(void) {
    /* project */
    project.num_agents = 30;
    project.num_tickets = 300;
    /* threads pool */
    threads_pool.pool_size = 10;
    pthread_t pool[threads_pool.pool_size];
    threads_pool.pool = &pool[0];
    /* producer-consumer semaphore */
    sem.producer_sem = sem_open("/producer_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, threads_pool.pool_size);
    sem.consumer_sem = sem_open("/consumer_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
    /* barrier */
    barrier.producer_b = sem_open("/producer_b", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
    barrier.consumer_b = sem_open("/consumer_b", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
    /* create producer(), consumer() threads */
    int producer_err = pthread_create(&producer_consumer.producer, NULL, producer, NULL);
    int consumer_err = pthread_create(&producer_consumer.consumer, NULL, consumer, NULL);
    if (producer_err != 0 || consumer_err != 0) {
        if (producer_err != 0) { 
            fprintf(stdout, "error: checkin() thread creation failed!\n");
            fflush(stdout);
        }
        if (consumer_err != 0) {
            fprintf(stdout, "error: checkout() thread creation failed!\n");
            fflush(stdout);
        }
        sem_unlink("/producer_sem");
        sem_unlink("/consumer_sem");
        sem_unlink("/producer_b");
        sem_unlink("/consumer_b");
        return;
    }

    /*
     * wait for checkin() and checkout() to exit
     */
    pthread_join(producer_consumer.producer, NULL);
    pthread_join(producer_consumer.consumer, NULL);

    /*
     * free resources
     * --------------
     * Always use sem_unlink() to remove the semaphore, but not sem_close().
     * Close a semaphore does NOT remove the semaphore from system shared memory.
     * Close operation occurs automatically on process termination, we don't have 
     * to write sem_close() explicitly. 
     */
    sem_unlink("/producer_sem");
    sem_unlink("/consumer_sem");
    sem_unlink("/producer_b");
    sem_unlink("/consumer_b");
}
    

int main(void) {
    run();
}