始终取消链接()共享内存中的 POSIX 命名信号量
Always unlink() the POSIX named semaphore in shared memory
在下面的代码中,每个线程运行 sell_tickets()
函数,获取互斥锁并减少票数。为了限制活动线程数,checkin()
和checkout()
线程是使用信号量的生产者-消费者模型。
但是在单元测试中,似乎消费者函数 checkout()
在生产者 checkin()
甚至创建新线程之前调用了 pthread_join()
函数。所以有什么不对吗?是因为线程不共享堆栈内存段,因为我没有 malloc()
堆 space 一些参数。
/**
* Using semaphore to limit maximum thread number.
*/
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
/**
* If arguments that pthread_create() pass to sell_tickets() are more than one,
* they need to be wrapped into a single structure.
*/
typedef struct {
unsigned agent_id; // simulate an agent
unsigned tickets_tosell; // agent's personal goal of the day
pthread_mutex_t *pool_lock; // mutex lock for visiting the shared tickets pool
unsigned *tickets_pool; // shared tickets pool
} agent;
/**
* Constructor
*/
static void new_agent(agent *a, unsigned agentid, unsigned ticketsnum, pthread_mutex_t *lock, unsigned *tickets_pool) {
a->agent_id = agentid;
a->tickets_tosell = ticketsnum;
a->pool_lock = lock;
a->tickets_pool = tickets_pool;
}
/**
* Implement void *(*start_rtn)(void *);
* -------------------------------------
* Each thread execute this function.
*/
static void *sell_tickets(void *agent_addr) {
agent *a = (agent *)agent_addr;
while (a->tickets_tosell > 0) {
pthread_mutex_lock(a->pool_lock);
(*a->tickets_pool)--;
printf("agent@%d sells a ticket, %d tickets left in pool.\n", a->agent_id, *a->tickets_pool);
pthread_mutex_unlock(a->pool_lock);
a->tickets_tosell--;
printf("agent@%d has %d tickets to sell.\n", a->agent_id, a->tickets_tosell);
}
pthread_exit((void *)&a->agent_id);
}
typedef struct {
/* shared threads pool */
pthread_t *pool_addr;
unsigned pool_size;
/* a pair of producer-consumer semaphore */
sem_t *producer_sem;
sem_t *consumer_sem;
/* producer & consumer thread id */
pthread_t *producer_tid;
pthread_t *consumer_tid;
} threads_pool;
static void new_threads_pool(threads_pool *tp,
pthread_t *pool_addr, unsigned pool_size,
sem_t *producer_sem, sem_t *consumer_sem,
pthread_t *ptid, pthread_t *ctid) {
tp->pool_addr = pool_addr;
tp->pool_size = pool_size;
tp->producer_sem = producer_sem;
tp->consumer_sem = consumer_sem;
tp->producer_tid = ptid;
tp->consumer_tid = ctid;
}
typedef struct {
sem_t *checkin_b;
sem_t *checkout_b;
} barrier;
static void new_barrier(barrier *b, sem_t *inb, sem_t *outb) {
b->checkin_b = inb;
b->checkout_b = outb;
}
typedef struct {
unsigned num_agents;
unsigned num_tickets;
} project;
static void new_project(project *p, unsigned num_agents, unsigned num_tickets) {
p->num_agents = num_agents;
p->num_tickets = num_tickets;
}
typedef struct {
project *pj;
threads_pool *tp;
barrier *b;
} project_params;
static void new_project_params(project_params *pp, project *pj, threads_pool *tp, barrier *b) {
pp->pj = pj;
pp->tp = tp;
pp->b = b;
}
/**
* producer thread
* create agent sell_tickets() threads
*/
static void *checkin(void *params) {
project_params *pp = (project_params *)params;
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(pp->b->checkout_b);
sem_wait(pp->b->checkin_b);
unsigned tickets_pool = pp->pj->num_tickets; // shared resourses
pthread_mutex_t tickets_pool_lock; // mutex object
agent agents[pp->pj->num_agents]; // arguments pass to 10 threads
unsigned id; // agent id
pthread_t tid; // current sell_tickets() thread id
int err; // thread_create() function return the error code
pthread_mutex_init(&tickets_pool_lock, NULL);
for (unsigned i = 0; i < pp->pj->num_agents; i++) {
id = i + 1;
new_agent(&agents[i], id, pp->pj->num_tickets / pp->pj->num_agents, &tickets_pool_lock, &tickets_pool);
sem_wait(pp->tp->producer_sem);
err = pthread_create(pp->tp->pool_addr + (i % pp->tp->pool_size), NULL, sell_tickets, &agents[i]);
if (err != 0) {
printf("error[%d]: can't create thread.", err);
pthread_cancel(*pp->tp->consumer_tid);
pthread_exit(NULL);
} else {
tid = *(pp->tp->pool_addr + (i % pp->tp->pool_size));
printf("thread of agent@%d created, thread id = @%lx\n", id, (unsigned long)tid);
sem_post(pp->tp->consumer_sem);
}
}
/* wait for checkout() thread to finish */
pthread_join(*pp->tp->consumer_tid, NULL);
pthread_mutex_destroy(&tickets_pool_lock);
if (tickets_pool == 0) {
printf("Today's tickets are sold out!\n");
} else {
printf("%d tickets left at the end of the day!\n", tickets_pool);
}
pthread_exit(NULL);
}
/**
* consumer thread
* wait for old agent sell_tickets() thread to exit and increment the producer semaphore
*/
static void *checkout(void *params) {
project_params *pp = (project_params *)params;
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(pp->b->checkin_b);
sem_wait(pp->b->checkout_b);
void *tret; // to store pthread_exit() exit status rval_ptr
int errcode; // pthread_join() return err code
for (unsigned i = 0; i < pp->pj->num_agents; i++) {
sem_wait(pp->tp->consumer_sem);
errcode = pthread_join(*(pp->tp->pool_addr + (i % pp->tp->pool_size)), &tret);
sem_post(pp->tp->producer_sem);
printf("agent@%d has finished his job!\n", *(unsigned *)tret);
}
pthread_exit(NULL);
}
/**
* producer-consumer model using semaphore
* maximum 10 agent sell_tickets() threads work at a time
*/
static void run(void) {
unsigned num_agents = 30;
unsigned num_tickets = 300;
/*
* project
*/
project pj;
new_project(&pj, num_agents, num_tickets);
/*
* thread pool of maximum 10 thread
*/
unsigned threads_pool_size = 10;
pthread_t pool[threads_pool_size];
/*
* checkin() and checkout() threads
*/
pthread_t checkin_tid, checkout_tid;
int checkin_err, checkout_err;
/* a pair of producer-consumer semaphores */
sem_t *checkin_sem, *checkout_sem;
checkin_sem = sem_open("/checkin_sem", O_CREAT, S_IRWXG, threads_pool_size);
checkout_sem = sem_open("/checkout_sem", O_CREAT, S_IRWXG, 0);
/* barrier */
barrier b;
sem_t *checkin_b, *checkout_b;
checkin_b = sem_open("/checkin_b", O_CREAT, S_IRWXG, 0);
checkout_b = sem_open("/checkout_b", O_CREAT, S_IRWXG, 0);
new_barrier(&b, checkin_b, checkout_b);
/* wrap checkin() checkout() parameters into a single structure */
threads_pool tp;
new_threads_pool(&tp, &pool[0], threads_pool_size, checkin_sem, checkout_sem, &checkin_tid, &checkout_tid);
project_params pp;
new_project_params(&pp, &pj, &tp, &b);
/* create checkin(), checkout() threads */
checkin_err = pthread_create(&checkin_tid, NULL, checkin, &pp);
checkout_err = pthread_create(&checkout_tid, NULL, checkout, &pp);
if (checkin_err != 0 || checkout_err != 0) {
if (checkin_err != 0) printf("error: checkin() thread creation failed!\n");
if (checkout_err != 0) printf("error: checkout() thread creation failed!\n");
sem_close(checkin_sem);
sem_close(checkout_sem);
sem_close(checkin_b);
sem_close(checkout_b);
return;
}
/*
* wait for checkin() and checkout() to exit
*/
pthread_join(checkin_tid, NULL);
pthread_join(checkout_tid, NULL);
/*
* free resources
*/
sem_close(checkin_sem);
sem_close(checkout_sem);
sem_close(checkin_b);
sem_close(checkout_b);
}
int main(void) {
run();
}
lldb
显示 thread_join()
return 错误代码 3
:ESRCH
.
Process 46631 stopped
* thread #3, stop reason = EXC_BAD_ACCESS (code=1, address=0x0)
frame #0: 0x0000000100003b39 agtktp`checkout(params=0x00007ffeefbff618) at agents_tickets_pool.c:221:46
217 errcode = pthread_join(*(pp->tp->pool_addr + (i % pp->tp->pool_size)), &tret);
218 pthread_t tid = *(pp->tp->pool_addr + (i % pp->tp->pool_size));
219 printf("thread@%u = %lx\n", i + 1, (unsigned long)tid);
220 sem_post(pp->tp->producer_sem);
-> 221 printf("agent@%d has finished his job!\n", *(unsigned *)tret);
222 }
223 pthread_exit(NULL);
224 }
Target 0: (agtktp) stopped.
(lldb) fr v errcode
(int) errcode = 3
(lldb) fr v tid
(pthread_t) tid = 0x0000000000000050
=== 2021 年 1 月 3 日更新 2 ===
我已经发布了解决方案。如果有什么不对,请随时纠正我。谢谢大家,特别是@Semion。
=== 2021 年 1 月 3 日更新 2 结束 ===
=== 2021 年 1 月 3 日更新 1 ===
目前我已经修复了这个问题,并发现我在下面的代码中犯了一个典型的错误。
我需要一些时间来重新编辑这个问题并给出答案,以使其对以后的 POSIX Semaphore 用户有用。请不要 关闭这个问题(它已经有 2 个接近的投票)。谢谢。
=== 2021 年 1 月 3 日更新 1 结束 ===
错误现已修复。问题出在 sem_close()
.
根据“Unix Networking Program - Vol.2 - ch10”,sem_close()
关闭信号量,但不会将其从系统共享内存中删除。 POSIX 命名信号量是“kernel-persistent”。
所以基本上我为每个 运行 使用了同一对旧信号量,称为 /checkin_sem
和 checkout_sem
。一旦程序异常退出,在信号量中留下一些奇怪的值,后续的所有测试都会崩溃。这就是为什么其他一些人说该程序在他们的机器上运行良好的原因。因为他们的系统上没有遗留信号量。
为了解决这个问题,我们应该使用sem_unlink()
而不是,这样信号量名称将从系统中删除,信号量的破坏将发生直到最后 sem_close()
发生。由于关闭操作会在进程终止时自动发生,因此我们不必显式编写 sem_close()
。
sem_unlink("/checkin_sem");
sem_unlink("/checkout_sem");
sem_unlink("/checkin_b");
sem_unlink("/checkout_b");
并且为了防止遗留信号量影响我们的程序,最好在创建信号量时添加O_EXCL
,当信号量已经存在时会弹出错误。
checkin_sem = sem_open("/checkin_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, threads_pool_size);
checkout_sem = sem_open("/checkout_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
最后,对于权限标志,如果程序不涉及进程间通信,所有者的简单read
、write
即可,即S_IREAD | S_IWRITE
或 S_IRUSR | S_IWUSR
.
这是这个生产者-消费者线程池的最终版本。
/**
* Using semaphore to limit maximum thread number.
*/
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
/**
* If arguments that pthread_create() pass to sell_tickets() are more than one,
* they need to be wrapped into a single structure.
*/
typedef struct {
unsigned agent_id; // simulate an agent
unsigned tickets_tosell; // agent's personal goal of the day
unsigned *tickets_pool; // shared tickets pool
pthread_mutex_t *pool_lock; // mutex lock for visiting the shared tickets pool
} agent;
/**
* Constructor
*/
static void new_agent(agent *a, unsigned agentid, unsigned tickets_num, unsigned *pool, pthread_mutex_t *lock) {
a->agent_id = agentid;
a->tickets_tosell = tickets_num;
a->tickets_pool = pool;
a->pool_lock = lock;
}
/**
* Implement void *(*start_rtn)(void *);
* -------------------------------------
* Each thread execute this function.
*/
static void *sell_tickets(void *agent_addr) {
agent *a = (agent *)agent_addr;
while (a->tickets_tosell > 0) {
pthread_mutex_lock(a->pool_lock); // begin of race condition
(*a->tickets_pool)--;
fprintf(stdout, "agent@%d sells a ticket, %d tickets left in pool.\n", a->agent_id, *a->tickets_pool);
fflush(stdout);
pthread_mutex_unlock(a->pool_lock); // end of race condition
a->tickets_tosell--;
fprintf(stdout, "agent@%d has %d tickets to sell.\n", a->agent_id, a->tickets_tosell);
fflush(stdout);
}
pthread_exit((void *)&a->agent_id);
}
/**
* Simulate a tickets selling project
*/
struct {
unsigned num_agents;
unsigned num_tickets;
} project;
/**
* Shared threads pool
*/
struct {
unsigned pool_size;
pthread_t *pool;
} threads_pool;
/**
* Producer, consumer threads id
*/
struct {
pthread_t producer;
pthread_t consumer;
} producer_consumer;
/**
* A pair of semaphore
*/
struct {
sem_t *producer_sem;
sem_t *consumer_sem;
} sem;
/**
* Barrier is not implemented in mac os, we use a pair of semaphore instead.
*/
struct {
sem_t *producer_b;
sem_t *consumer_b;
} barrier;
/**
* producer thread
* create agent sell_tickets() threads
*/
static void *producer(void *arg) {
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(barrier.consumer_b);
sem_wait(barrier.producer_b);
unsigned tickets_pool = project.num_tickets; // shared resource for each agent
pthread_mutex_t tickets_pool_lock; // mutex lock for visiting tickets pool
agent agents[project.num_agents]; // arguments pass to 10 threads
unsigned id; // agent id
pthread_t tid; // current sell_tickets() thread id
int err; // thread_create() return err code
pthread_mutex_init(&tickets_pool_lock, NULL);
for (unsigned i = 0; i < project.num_agents; i++) {
id = i + 1;
new_agent(&agents[i], id, project.num_tickets / project.num_agents, &tickets_pool, &tickets_pool_lock);
sem_wait(sem.producer_sem);
err = pthread_create(threads_pool.pool + i % threads_pool.pool_size, NULL, sell_tickets, &agents[i]);
if (err != 0) {
fprintf(stdout, "error[%d]: can't create thread.\n", err);
fflush(stdout);
pthread_cancel(producer_consumer.consumer);
pthread_exit(NULL);
} else {
tid = *(threads_pool.pool + i % threads_pool.pool_size);
fprintf(stdout, "thread of agent@%d created, thread id = @%lx\n", id, (unsigned long)tid);
fflush(stdout);
sem_post(sem.consumer_sem);
}
}
/* wait for checkout() thread to finish */
pthread_join(producer_consumer.consumer, NULL);
pthread_mutex_destroy(&tickets_pool_lock);
if (tickets_pool == 0) {
fprintf(stdout, "Today's tickets are sold out!\n");
fflush(stdout);
} else {
fprintf(stdout, "%d tickets left at the end of the day!\n", tickets_pool);
fflush(stdout);
}
pthread_exit(NULL);
}
/**
* consumer thread
* wait for old agent sell_tickets() thread to exit and increment the producer semaphore
*/
static void *consumer(void *arg) {
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(barrier.producer_b);
sem_wait(barrier.consumer_b);
void *tret; // to store pthread_exit() exit status rval_ptr
for (unsigned i = 0; i < project.num_agents; i++) {
sem_wait(sem.consumer_sem);
pthread_join(*(threads_pool.pool + i % threads_pool.pool_size), &tret);
sem_post(sem.producer_sem);
fprintf(stdout, "agent@%d has finished his job!\n", *(unsigned *)tret);
fflush(stdout);
}
pthread_exit(NULL);
}
/**
* maximum 10 agent sell_tickets() threads work at a time
*/
static void run(void) {
/* project */
project.num_agents = 30;
project.num_tickets = 300;
/* threads pool */
threads_pool.pool_size = 10;
pthread_t pool[threads_pool.pool_size];
threads_pool.pool = &pool[0];
/* producer-consumer semaphore */
sem.producer_sem = sem_open("/producer_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, threads_pool.pool_size);
sem.consumer_sem = sem_open("/consumer_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
/* barrier */
barrier.producer_b = sem_open("/producer_b", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
barrier.consumer_b = sem_open("/consumer_b", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
/* create producer(), consumer() threads */
int producer_err = pthread_create(&producer_consumer.producer, NULL, producer, NULL);
int consumer_err = pthread_create(&producer_consumer.consumer, NULL, consumer, NULL);
if (producer_err != 0 || consumer_err != 0) {
if (producer_err != 0) {
fprintf(stdout, "error: checkin() thread creation failed!\n");
fflush(stdout);
}
if (consumer_err != 0) {
fprintf(stdout, "error: checkout() thread creation failed!\n");
fflush(stdout);
}
sem_unlink("/producer_sem");
sem_unlink("/consumer_sem");
sem_unlink("/producer_b");
sem_unlink("/consumer_b");
return;
}
/*
* wait for checkin() and checkout() to exit
*/
pthread_join(producer_consumer.producer, NULL);
pthread_join(producer_consumer.consumer, NULL);
/*
* free resources
* --------------
* Always use sem_unlink() to remove the semaphore, but not sem_close().
* Close a semaphore does NOT remove the semaphore from system shared memory.
* Close operation occurs automatically on process termination, we don't have
* to write sem_close() explicitly.
*/
sem_unlink("/producer_sem");
sem_unlink("/consumer_sem");
sem_unlink("/producer_b");
sem_unlink("/consumer_b");
}
int main(void) {
run();
}
在下面的代码中,每个线程运行 sell_tickets()
函数,获取互斥锁并减少票数。为了限制活动线程数,checkin()
和checkout()
线程是使用信号量的生产者-消费者模型。
但是在单元测试中,似乎消费者函数 checkout()
在生产者 checkin()
甚至创建新线程之前调用了 pthread_join()
函数。所以有什么不对吗?是因为线程不共享堆栈内存段,因为我没有 malloc()
堆 space 一些参数。
/**
* Using semaphore to limit maximum thread number.
*/
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
/**
* If arguments that pthread_create() pass to sell_tickets() are more than one,
* they need to be wrapped into a single structure.
*/
typedef struct {
unsigned agent_id; // simulate an agent
unsigned tickets_tosell; // agent's personal goal of the day
pthread_mutex_t *pool_lock; // mutex lock for visiting the shared tickets pool
unsigned *tickets_pool; // shared tickets pool
} agent;
/**
* Constructor
*/
static void new_agent(agent *a, unsigned agentid, unsigned ticketsnum, pthread_mutex_t *lock, unsigned *tickets_pool) {
a->agent_id = agentid;
a->tickets_tosell = ticketsnum;
a->pool_lock = lock;
a->tickets_pool = tickets_pool;
}
/**
* Implement void *(*start_rtn)(void *);
* -------------------------------------
* Each thread execute this function.
*/
static void *sell_tickets(void *agent_addr) {
agent *a = (agent *)agent_addr;
while (a->tickets_tosell > 0) {
pthread_mutex_lock(a->pool_lock);
(*a->tickets_pool)--;
printf("agent@%d sells a ticket, %d tickets left in pool.\n", a->agent_id, *a->tickets_pool);
pthread_mutex_unlock(a->pool_lock);
a->tickets_tosell--;
printf("agent@%d has %d tickets to sell.\n", a->agent_id, a->tickets_tosell);
}
pthread_exit((void *)&a->agent_id);
}
typedef struct {
/* shared threads pool */
pthread_t *pool_addr;
unsigned pool_size;
/* a pair of producer-consumer semaphore */
sem_t *producer_sem;
sem_t *consumer_sem;
/* producer & consumer thread id */
pthread_t *producer_tid;
pthread_t *consumer_tid;
} threads_pool;
static void new_threads_pool(threads_pool *tp,
pthread_t *pool_addr, unsigned pool_size,
sem_t *producer_sem, sem_t *consumer_sem,
pthread_t *ptid, pthread_t *ctid) {
tp->pool_addr = pool_addr;
tp->pool_size = pool_size;
tp->producer_sem = producer_sem;
tp->consumer_sem = consumer_sem;
tp->producer_tid = ptid;
tp->consumer_tid = ctid;
}
typedef struct {
sem_t *checkin_b;
sem_t *checkout_b;
} barrier;
static void new_barrier(barrier *b, sem_t *inb, sem_t *outb) {
b->checkin_b = inb;
b->checkout_b = outb;
}
typedef struct {
unsigned num_agents;
unsigned num_tickets;
} project;
static void new_project(project *p, unsigned num_agents, unsigned num_tickets) {
p->num_agents = num_agents;
p->num_tickets = num_tickets;
}
typedef struct {
project *pj;
threads_pool *tp;
barrier *b;
} project_params;
static void new_project_params(project_params *pp, project *pj, threads_pool *tp, barrier *b) {
pp->pj = pj;
pp->tp = tp;
pp->b = b;
}
/**
* producer thread
* create agent sell_tickets() threads
*/
static void *checkin(void *params) {
project_params *pp = (project_params *)params;
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(pp->b->checkout_b);
sem_wait(pp->b->checkin_b);
unsigned tickets_pool = pp->pj->num_tickets; // shared resourses
pthread_mutex_t tickets_pool_lock; // mutex object
agent agents[pp->pj->num_agents]; // arguments pass to 10 threads
unsigned id; // agent id
pthread_t tid; // current sell_tickets() thread id
int err; // thread_create() function return the error code
pthread_mutex_init(&tickets_pool_lock, NULL);
for (unsigned i = 0; i < pp->pj->num_agents; i++) {
id = i + 1;
new_agent(&agents[i], id, pp->pj->num_tickets / pp->pj->num_agents, &tickets_pool_lock, &tickets_pool);
sem_wait(pp->tp->producer_sem);
err = pthread_create(pp->tp->pool_addr + (i % pp->tp->pool_size), NULL, sell_tickets, &agents[i]);
if (err != 0) {
printf("error[%d]: can't create thread.", err);
pthread_cancel(*pp->tp->consumer_tid);
pthread_exit(NULL);
} else {
tid = *(pp->tp->pool_addr + (i % pp->tp->pool_size));
printf("thread of agent@%d created, thread id = @%lx\n", id, (unsigned long)tid);
sem_post(pp->tp->consumer_sem);
}
}
/* wait for checkout() thread to finish */
pthread_join(*pp->tp->consumer_tid, NULL);
pthread_mutex_destroy(&tickets_pool_lock);
if (tickets_pool == 0) {
printf("Today's tickets are sold out!\n");
} else {
printf("%d tickets left at the end of the day!\n", tickets_pool);
}
pthread_exit(NULL);
}
/**
* consumer thread
* wait for old agent sell_tickets() thread to exit and increment the producer semaphore
*/
static void *checkout(void *params) {
project_params *pp = (project_params *)params;
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(pp->b->checkin_b);
sem_wait(pp->b->checkout_b);
void *tret; // to store pthread_exit() exit status rval_ptr
int errcode; // pthread_join() return err code
for (unsigned i = 0; i < pp->pj->num_agents; i++) {
sem_wait(pp->tp->consumer_sem);
errcode = pthread_join(*(pp->tp->pool_addr + (i % pp->tp->pool_size)), &tret);
sem_post(pp->tp->producer_sem);
printf("agent@%d has finished his job!\n", *(unsigned *)tret);
}
pthread_exit(NULL);
}
/**
* producer-consumer model using semaphore
* maximum 10 agent sell_tickets() threads work at a time
*/
static void run(void) {
unsigned num_agents = 30;
unsigned num_tickets = 300;
/*
* project
*/
project pj;
new_project(&pj, num_agents, num_tickets);
/*
* thread pool of maximum 10 thread
*/
unsigned threads_pool_size = 10;
pthread_t pool[threads_pool_size];
/*
* checkin() and checkout() threads
*/
pthread_t checkin_tid, checkout_tid;
int checkin_err, checkout_err;
/* a pair of producer-consumer semaphores */
sem_t *checkin_sem, *checkout_sem;
checkin_sem = sem_open("/checkin_sem", O_CREAT, S_IRWXG, threads_pool_size);
checkout_sem = sem_open("/checkout_sem", O_CREAT, S_IRWXG, 0);
/* barrier */
barrier b;
sem_t *checkin_b, *checkout_b;
checkin_b = sem_open("/checkin_b", O_CREAT, S_IRWXG, 0);
checkout_b = sem_open("/checkout_b", O_CREAT, S_IRWXG, 0);
new_barrier(&b, checkin_b, checkout_b);
/* wrap checkin() checkout() parameters into a single structure */
threads_pool tp;
new_threads_pool(&tp, &pool[0], threads_pool_size, checkin_sem, checkout_sem, &checkin_tid, &checkout_tid);
project_params pp;
new_project_params(&pp, &pj, &tp, &b);
/* create checkin(), checkout() threads */
checkin_err = pthread_create(&checkin_tid, NULL, checkin, &pp);
checkout_err = pthread_create(&checkout_tid, NULL, checkout, &pp);
if (checkin_err != 0 || checkout_err != 0) {
if (checkin_err != 0) printf("error: checkin() thread creation failed!\n");
if (checkout_err != 0) printf("error: checkout() thread creation failed!\n");
sem_close(checkin_sem);
sem_close(checkout_sem);
sem_close(checkin_b);
sem_close(checkout_b);
return;
}
/*
* wait for checkin() and checkout() to exit
*/
pthread_join(checkin_tid, NULL);
pthread_join(checkout_tid, NULL);
/*
* free resources
*/
sem_close(checkin_sem);
sem_close(checkout_sem);
sem_close(checkin_b);
sem_close(checkout_b);
}
int main(void) {
run();
}
lldb
显示 thread_join()
return 错误代码 3
:ESRCH
.
Process 46631 stopped
* thread #3, stop reason = EXC_BAD_ACCESS (code=1, address=0x0)
frame #0: 0x0000000100003b39 agtktp`checkout(params=0x00007ffeefbff618) at agents_tickets_pool.c:221:46
217 errcode = pthread_join(*(pp->tp->pool_addr + (i % pp->tp->pool_size)), &tret);
218 pthread_t tid = *(pp->tp->pool_addr + (i % pp->tp->pool_size));
219 printf("thread@%u = %lx\n", i + 1, (unsigned long)tid);
220 sem_post(pp->tp->producer_sem);
-> 221 printf("agent@%d has finished his job!\n", *(unsigned *)tret);
222 }
223 pthread_exit(NULL);
224 }
Target 0: (agtktp) stopped.
(lldb) fr v errcode
(int) errcode = 3
(lldb) fr v tid
(pthread_t) tid = 0x0000000000000050
=== 2021 年 1 月 3 日更新 2 ===
我已经发布了解决方案。如果有什么不对,请随时纠正我。谢谢大家,特别是@Semion。
=== 2021 年 1 月 3 日更新 2 结束 ===
=== 2021 年 1 月 3 日更新 1 ===
目前我已经修复了这个问题,并发现我在下面的代码中犯了一个典型的错误。
我需要一些时间来重新编辑这个问题并给出答案,以使其对以后的 POSIX Semaphore 用户有用。请不要 关闭这个问题(它已经有 2 个接近的投票)。谢谢。
=== 2021 年 1 月 3 日更新 1 结束 ===
错误现已修复。问题出在 sem_close()
.
根据“Unix Networking Program - Vol.2 - ch10”,sem_close()
关闭信号量,但不会将其从系统共享内存中删除。 POSIX 命名信号量是“kernel-persistent”。
所以基本上我为每个 运行 使用了同一对旧信号量,称为 /checkin_sem
和 checkout_sem
。一旦程序异常退出,在信号量中留下一些奇怪的值,后续的所有测试都会崩溃。这就是为什么其他一些人说该程序在他们的机器上运行良好的原因。因为他们的系统上没有遗留信号量。
为了解决这个问题,我们应该使用sem_unlink()
而不是,这样信号量名称将从系统中删除,信号量的破坏将发生直到最后 sem_close()
发生。由于关闭操作会在进程终止时自动发生,因此我们不必显式编写 sem_close()
。
sem_unlink("/checkin_sem");
sem_unlink("/checkout_sem");
sem_unlink("/checkin_b");
sem_unlink("/checkout_b");
并且为了防止遗留信号量影响我们的程序,最好在创建信号量时添加O_EXCL
,当信号量已经存在时会弹出错误。
checkin_sem = sem_open("/checkin_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, threads_pool_size);
checkout_sem = sem_open("/checkout_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
最后,对于权限标志,如果程序不涉及进程间通信,所有者的简单read
、write
即可,即S_IREAD | S_IWRITE
或 S_IRUSR | S_IWUSR
.
这是这个生产者-消费者线程池的最终版本。
/**
* Using semaphore to limit maximum thread number.
*/
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
/**
* If arguments that pthread_create() pass to sell_tickets() are more than one,
* they need to be wrapped into a single structure.
*/
typedef struct {
unsigned agent_id; // simulate an agent
unsigned tickets_tosell; // agent's personal goal of the day
unsigned *tickets_pool; // shared tickets pool
pthread_mutex_t *pool_lock; // mutex lock for visiting the shared tickets pool
} agent;
/**
* Constructor
*/
static void new_agent(agent *a, unsigned agentid, unsigned tickets_num, unsigned *pool, pthread_mutex_t *lock) {
a->agent_id = agentid;
a->tickets_tosell = tickets_num;
a->tickets_pool = pool;
a->pool_lock = lock;
}
/**
* Implement void *(*start_rtn)(void *);
* -------------------------------------
* Each thread execute this function.
*/
static void *sell_tickets(void *agent_addr) {
agent *a = (agent *)agent_addr;
while (a->tickets_tosell > 0) {
pthread_mutex_lock(a->pool_lock); // begin of race condition
(*a->tickets_pool)--;
fprintf(stdout, "agent@%d sells a ticket, %d tickets left in pool.\n", a->agent_id, *a->tickets_pool);
fflush(stdout);
pthread_mutex_unlock(a->pool_lock); // end of race condition
a->tickets_tosell--;
fprintf(stdout, "agent@%d has %d tickets to sell.\n", a->agent_id, a->tickets_tosell);
fflush(stdout);
}
pthread_exit((void *)&a->agent_id);
}
/**
* Simulate a tickets selling project
*/
struct {
unsigned num_agents;
unsigned num_tickets;
} project;
/**
* Shared threads pool
*/
struct {
unsigned pool_size;
pthread_t *pool;
} threads_pool;
/**
* Producer, consumer threads id
*/
struct {
pthread_t producer;
pthread_t consumer;
} producer_consumer;
/**
* A pair of semaphore
*/
struct {
sem_t *producer_sem;
sem_t *consumer_sem;
} sem;
/**
* Barrier is not implemented in mac os, we use a pair of semaphore instead.
*/
struct {
sem_t *producer_b;
sem_t *consumer_b;
} barrier;
/**
* producer thread
* create agent sell_tickets() threads
*/
static void *producer(void *arg) {
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(barrier.consumer_b);
sem_wait(barrier.producer_b);
unsigned tickets_pool = project.num_tickets; // shared resource for each agent
pthread_mutex_t tickets_pool_lock; // mutex lock for visiting tickets pool
agent agents[project.num_agents]; // arguments pass to 10 threads
unsigned id; // agent id
pthread_t tid; // current sell_tickets() thread id
int err; // thread_create() return err code
pthread_mutex_init(&tickets_pool_lock, NULL);
for (unsigned i = 0; i < project.num_agents; i++) {
id = i + 1;
new_agent(&agents[i], id, project.num_tickets / project.num_agents, &tickets_pool, &tickets_pool_lock);
sem_wait(sem.producer_sem);
err = pthread_create(threads_pool.pool + i % threads_pool.pool_size, NULL, sell_tickets, &agents[i]);
if (err != 0) {
fprintf(stdout, "error[%d]: can't create thread.\n", err);
fflush(stdout);
pthread_cancel(producer_consumer.consumer);
pthread_exit(NULL);
} else {
tid = *(threads_pool.pool + i % threads_pool.pool_size);
fprintf(stdout, "thread of agent@%d created, thread id = @%lx\n", id, (unsigned long)tid);
fflush(stdout);
sem_post(sem.consumer_sem);
}
}
/* wait for checkout() thread to finish */
pthread_join(producer_consumer.consumer, NULL);
pthread_mutex_destroy(&tickets_pool_lock);
if (tickets_pool == 0) {
fprintf(stdout, "Today's tickets are sold out!\n");
fflush(stdout);
} else {
fprintf(stdout, "%d tickets left at the end of the day!\n", tickets_pool);
fflush(stdout);
}
pthread_exit(NULL);
}
/**
* consumer thread
* wait for old agent sell_tickets() thread to exit and increment the producer semaphore
*/
static void *consumer(void *arg) {
/* barrier assure that checkin() and checkout() thread begin to work when two threads are both created. */
sem_post(barrier.producer_b);
sem_wait(barrier.consumer_b);
void *tret; // to store pthread_exit() exit status rval_ptr
for (unsigned i = 0; i < project.num_agents; i++) {
sem_wait(sem.consumer_sem);
pthread_join(*(threads_pool.pool + i % threads_pool.pool_size), &tret);
sem_post(sem.producer_sem);
fprintf(stdout, "agent@%d has finished his job!\n", *(unsigned *)tret);
fflush(stdout);
}
pthread_exit(NULL);
}
/**
* maximum 10 agent sell_tickets() threads work at a time
*/
static void run(void) {
/* project */
project.num_agents = 30;
project.num_tickets = 300;
/* threads pool */
threads_pool.pool_size = 10;
pthread_t pool[threads_pool.pool_size];
threads_pool.pool = &pool[0];
/* producer-consumer semaphore */
sem.producer_sem = sem_open("/producer_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, threads_pool.pool_size);
sem.consumer_sem = sem_open("/consumer_sem", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
/* barrier */
barrier.producer_b = sem_open("/producer_b", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
barrier.consumer_b = sem_open("/consumer_b", O_CREAT | O_EXCL, S_IREAD | S_IWRITE, 0);
/* create producer(), consumer() threads */
int producer_err = pthread_create(&producer_consumer.producer, NULL, producer, NULL);
int consumer_err = pthread_create(&producer_consumer.consumer, NULL, consumer, NULL);
if (producer_err != 0 || consumer_err != 0) {
if (producer_err != 0) {
fprintf(stdout, "error: checkin() thread creation failed!\n");
fflush(stdout);
}
if (consumer_err != 0) {
fprintf(stdout, "error: checkout() thread creation failed!\n");
fflush(stdout);
}
sem_unlink("/producer_sem");
sem_unlink("/consumer_sem");
sem_unlink("/producer_b");
sem_unlink("/consumer_b");
return;
}
/*
* wait for checkin() and checkout() to exit
*/
pthread_join(producer_consumer.producer, NULL);
pthread_join(producer_consumer.consumer, NULL);
/*
* free resources
* --------------
* Always use sem_unlink() to remove the semaphore, but not sem_close().
* Close a semaphore does NOT remove the semaphore from system shared memory.
* Close operation occurs automatically on process termination, we don't have
* to write sem_close() explicitly.
*/
sem_unlink("/producer_sem");
sem_unlink("/consumer_sem");
sem_unlink("/producer_b");
sem_unlink("/consumer_b");
}
int main(void) {
run();
}