有没有办法在线程 creation/destruction 上调用库线程本地 init/cleanup?
Is there a way to call library thread-local init/cleanup on thread creation/destruction?
此问题类似于 How to call a function on a thread's creation and exit? but more specific. In another multi-process shared memory project I used a combination of an __attribute__((constructor)) labeled library init routine, lazy initialisation for each thread, and robust futexes,以确保资源不会泄漏到共享内存中,即使系统管理员选择 SIGKILL 使用它的进程之一。然而,APIs 中的 futexes way 对于我当前的项目来说太重量级了,甚至是一些绕过一些惰性初始化的指令也是我宁愿避免的。库 APIs 将在几个进程的几百个线程中被调用数万亿次(每个 API 只是几百条指令。)
我猜答案是否定的,但由于我花了几个小时寻找并没有找到明确的答案,我想我会在这里问,然后下一个寻找简单答案的人将能够更快找到它。
我的目标很简单:当线程在多个进程中异步创建时执行一些每线程初始化,并在线程被异步销毁时在某个时刻稳健地执行一些清理。不必立即发生,它最终会发生。
一些参与批判性思维的假设想法:从 __attribute__((constructor)) 标记的库 init func 调用的假设 pthread_atclone() 将满足第一个条件。以及对 futex()es 的扩展,以添加一个带有每个线程 futex_adj 值的类似 semop 的操作,如果 do_exit() 中的值非零,则会导致设置 FUTEX_OWNER_DIED对于 futex "semaphore" 允许在下次触摸 futex 时进行清理。
好吧,首先,您应该记录库用户不应该以这样一种方式异步终止线程,即他们不会明确释放属于您的库的资源(关闭句柄,无论如何),TBH,只是终止线程在进程终止之前是个坏主意。
更难检测整个进程在使用您的库时是否被 SIGKILLed。我目前最好的猜测是,所有希望使用您的库的进程都必须先登录,以便可以将它们的 pid 添加到容器中。使用在您的 lib 初始化时启动的线程,轮询已用 kill(pid,0) 消失的 pid,并进行任何适当的清理。它不是很令人满意,(我讨厌轮询),但我没有看到任何不是非常混乱的替代方案:(
经过研究和实验,据我所知,我提出了最新的 "best practice"。如果谁知道更好的,请评论!
对于第一部分,每线程初始化,我无法想出任何替代直接延迟初始化的方法。但是,我确实决定将分支移动到调用者会稍微更有效一些,这样新堆栈框架中的流水线操作就不会立即面临实际上不必要的分支。所以不是这个:
__thread int tInf = 0;
void
threadDoSomething(void *data)
{
if (!tInf) {
_threadInitInfo(&tInf);
}
/*l
* do Something.
*/
}
这个:
__thread int tInf = 0;
#define threadDoSomething(data) (((!tInf)?_threadInitInfo(&tInf):0), \
_threadDoSomething((data)))
void
_threadDoSomething(void *data)
{
/*l
* do Something.
*/
}
对这种欢迎的(不可否认的)有用性的评论!
对于第二部分,无论多么异步,当线程死机时都稳健地执行一些清理,我找不到比在文件描述符上进行收割过程 epoll_wait() 更好的解决方案读取通过抽象 UNIX 域套接字地址上的 sendmsg() 调用中的 SCM_RIGHTS 控制消息传递给它的开放管道的末端。听起来很复杂,但还不错,这是客户端:
/*m
* Client that registers a thread with a server who will do cleanup of a
* shared interprocess object even if the thread dies asynchronously.
*/
#include <sys/socket.h> // socket(), bind(), recvmsg()
#include <sys/syscall.h> // syscall()
#include <sys/un.h> // sockaddr_un
#include <stdint.h> // uint64_t
#include <fcntl.h> // O_CLOEXEC()
#include <malloc.h> // malloc()
#include <stdlib.h> // random()
#include <unistd.h> // close(), usleep()
#include <pthread.h> // pthread_create()
#include <tsteplsrv.h> // Our API.
char iovBuf[] = "SP1"; // 3 char buf to send client type
__thread pid_t cliTid = 0; // per-thread copy of self's Thread ID
/*f
* initClient() is called when we realise we need to lazily initialise
* our thread based on cliTid being zero.
*/
void *
initClient(void *ptr)
{
struct sockaddr_un svAddr;
struct msghdr msg;
struct iovec io;
struct cmsghdr *ctrMsg;
uint64_t ltid; // local 8-byte copy of the tid
int pfds[2], // two fds of our pipe
sfd; // socket fd
/*s
* This union is necessary to ensure that the buffer is aligned such that
* we can read cmsg_{len,level,type} from the cmsghdr without causing an
* alignment fault (SIGBUS.)
*/
union {
struct cmsghdr hdr;
char buf[CMSG_SPACE(sizeof(int))];
} ctrBuf;
pfds[0] = pfds[1] = sfd = -1;
/*l
* Get our Thread ID.
*/
ltid = (uint64_t)(cliTid = syscall(SYS_gettid));
/*l
* Set up an abstract unix domain socket address.
*/
svAddr.sun_family = AF_UNIX;
svAddr.sun_path[0] = '[=12=]';
strcpy(&svAddr.sun_path[1], EPLS_SRV_ADDR);
/*l
* Set up a socket datagram send buffer.
*/
io.iov_base = iovBuf;
io.iov_len = sizeof(iovBuf);
msg.msg_iov = &io;
msg.msg_iovlen = 1;
msg.msg_control = ctrBuf.buf;
msg.msg_controllen = sizeof(ctrBuf);
msg.msg_name = (struct sockaddr *)&svAddr,
msg.msg_namelen = (&svAddr.sun_path[0] - (char *)&svAddr)
+ 1
+ sizeof(EPLS_SRV_ADDR);
/*l
* Set up the control message header to indicate we are sharing a file
* descriptor.
*/
ctrMsg = CMSG_FIRSTHDR(&msg);
ctrMsg->cmsg_len = CMSG_LEN(sizeof(int));
ctrMsg->cmsg_level = SOL_SOCKET;
ctrMsg->cmsg_type = SCM_RIGHTS;
/*l
* Create file descriptors with pipe().
*/
if (-1 == pipe(pfds)) {
printErrMsg("TID: %d pipe() failed", cliTid);
} else {
/*l
* Write our tid to the pipe.
*/
memmove(CMSG_DATA(ctrMsg), &pfds[0], sizeof(int));
if (-1 == write(pfds[1], <id, sizeof(uint64_t))) {
printErrMsg("TID: %d write() failed", cliTid);
} if (-1 == (sfd = socket(AF_UNIX, SOCK_DGRAM, 0))) {
printErrMsg("TID: %d socket() failed", cliTid);
} else if (-1 == sendmsg(sfd, &msg, 0)) {
printErrMsg("TID: %d sendmsg() failed", cliTid);
} else {
printVerbMsg("TID: %d sent write fd %d to server kept read fd %d",
cliTid,
pfds[0],
pfds[1]);
/*l
* Close the read end of the pipe, the server has it now.
*/
close(pfds[0]);
pfds[0] = -1;
}
}
if (-1 != pfds[1]) close(pfds[1]);
if (-1 != pfds[0]) close(pfds[0]);
if (-1 != sfd) close(sfd);
return (void *)0;
}
以及死神的代码:
/*m
* Abstract datagram socket listening for FD's from clients.
*/
#include <sys/socket.h> // socket(), bind(), recvmsg()
#include <sys/epoll.h> // epoll_{create,wait}()
#include <sys/un.h> // sockaddr_un
#include <malloc.h> // malloc()
#include <unistd.h> // close()
#include <tsteplsrv.h> // Our API.
/*s
* socket datagram structs for receiving structured messages used to transfer
* fds from our clients.
*/
struct msghdr msg = { 0 };
struct iovec io = { 0 };
char iovBuf[EPLS_MSG_LEN]; // 3 char buf to receive client type
/*s
* This union is necessary to ensure that the buffer is aligned such that
* we can read cmsg_{len,level,type} from the cmsghdr without causing an
* alignment fault (SIGBUS.)
*/
union {
struct cmsghdr hdr;
char buf[CMSG_SPACE(sizeof(int))];
} ctrBuf;
typedef struct _tidFd_t {
struct _tidFd_t *next;
pid_t tid;
int fd;
} tidFd_t;
tidFd_t *tidFdLst = (tidFd_t *)0;
/*f
* Perform some handshaking with a new client and add the file descriptor
* it shared with us to the epoll set.
*/
static void
welcomeClient(int efd, int cfd)
{
uint64_t tid;
tidFd_t *tfd;
struct epoll_event epEv;
tfd = (tidFd_t *)-1;
/*l
* The fd is a pipe and should be readable, and should contain the
* tid of the client.
*/
if (-1 != read(cfd, &tid, sizeof(tid)) && (tfd = malloc(sizeof(*tfd)))) {
tfd->fd = cfd;
tfd->tid = (pid_t)tid;
tfd->next = tidFdLst;
/*l
* Single threaded process, no race condition here.
*/
tidFdLst = tfd;
/*l
* Add the fd to the epoll() set so that we will be woken up with
* an error if the thread dies.
*/
epEv.events = EPOLLIN;
epEv.data.fd = cfd;
if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, cfd, &epEv)) {
printErrMsg("TID: %ld Could not register fd %d with epoll set",
tid,
cfd);
} else {
printVerbMsg("TID: %ld Registered fd %d with epoll set", tid, cfd);
}
/*l
* Couldn't allocate memory for the new client.
*/
} else if (!tfd) {
printErrMsg("Could not allocate memory for new client");
/*l
* Could not read from the eventfd() file descriptor.
*/
} else {
printErrMsg("Could not read from client file descriptor");
}
}
/*f
* Perform some handshaking with a new client and add the file descriptor
* it shared with us to the epoll set.
*/
static void
processClientEvent(int efd, struct epoll_event *epEv)
{
tidFd_t *tfd, **bLnk;
/*l
* Walk the list of per-tid fd structs.
*/
for (bLnk = &tidFdLst; (tfd = *bLnk); bLnk = &tfd->next)
if (tfd->fd == epEv->data.fd)
break;
if (!tfd) {
printErrMsg("client file descriptor %d not found on the tfd list!",
epEv->data.fd);
/*l
* If we received an EPOLLHUP on the fd, cleanup.
*/
} else if (epEv->events & EPOLLHUP) {
/*l
* Try to remove the tid's pipe fd from the epoll set.
*/
if (-1 == epoll_ctl(efd, EPOLL_CTL_DEL, epEv->data.fd, epEv)) {
printErrMsg("couldn't delete epoll for tid %d", tfd->tid);
/*l
* Do tid cleanup here.
*/
} else {
printVerbMsg("TID: %d closing fd: %d", tfd->tid, epEv->data.fd);
close(epEv->data.fd);
/*l
* Remove the per-tid struct from the list and free it.
*/
*bLnk = tfd->next;
free(tfd);
}
} else {
printVerbMsg("TID: %d Received unexpected epoll event %d",
tfd->tid,
epEv->events);
}
}
/*f
* Create and listen on a datagram socket for eventfd() file descriptors
* from clients.
*/
int
main(int argc, char *argv[])
{
struct sockaddr_un svAddr;
struct cmsghdr *ctrMsg;
struct epoll_event *epEv,
epEvs[EPLS_MAX_EPEVS];
int sfd, efd, cfd, nfds;
sfd = efd = -1;
/*l
* Set up an abstract unix domain socket address.
*/
svAddr.sun_family = AF_UNIX;
svAddr.sun_path[0] = '[=13=]';
strcpy(&svAddr.sun_path[1], EPLS_SRV_ADDR);
/*l
* Set up a socket datagram receive buffer.
*/
io.iov_base = iovBuf; // 3-char buffer to ID client type
io.iov_len = sizeof(iovBuf);
msg.msg_name = (char *)0; // No need for the client addr
msg.msg_namelen = 0;
msg.msg_iov = &io; // single IO vector in the S/G array
msg.msg_iovlen = 1;
msg.msg_control = ctrBuf.buf; // Control message buffer
msg.msg_controllen = sizeof(ctrBuf);
/*l
* Set up an epoll event.
*/
epEv = &epEvs[0];
epEv->events = EPOLLIN;
/*l
* Create a socket to receive datagrams on and register the socket
* with our epoll event.
*/
if (-1 == (epEv->data.fd = sfd = socket(AF_UNIX, SOCK_DGRAM, 0))) {
printErrMsg("socket creation failed");
/*l
* Bind to the abstract address. The pointer math is to portably
* handle weird structure packing _just_in_case_.
*/
} else if (-1 == bind(sfd,
(struct sockaddr *)&svAddr,
(&svAddr.sun_path[0] - (char *)&svAddr)
+ 1
+ sizeof(EPLS_SRV_ADDR))) {
printErrMsg("could not bind address: %s", &svAddr.sun_path[1]);
/*l
* Create an epoll interface. Set CLOEXEC for tidiness in case a thread
* in the server fork()s and exec()s.
*/
} else if (-1 == (efd = epoll_create1(EPOLL_CLOEXEC))) {
printErrMsg("could not create epoll instance");
/*l
* Add our socket fd to the epoll instance.
*/
} else if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, sfd, epEv)) {
printErrMsg("could not add socket to epoll instance");
/*l
* Loop receiving events on our epoll instance.
*/
} else {
printVerbMsg("server listening on abstract address: %s",
&svAddr.sun_path[1]);
/*l
* Loop forever listening for events on the fds we are interested
* in.
*/
while (-1 != (nfds = epoll_wait(efd, epEvs, EPLS_MAX_EPEVS, -1))) {
/*l
* For each fd with an event, figure out what's up!
*/
do {
/*l
* Transform nfds from a count to an index.
*/
--nfds;
/*l
* If the fd with an event is the listening socket a client
* is trying to send us their eventfd() file descriptor.
*/
if (sfd == epEvs[nfds].data.fd) {
if (EPOLLIN != epEvs[nfds].events) {
printErrMsg("unexpected condition on socket: %d",
epEvs[nfds].events);
nfds = -1;
break;
}
/*l
* Reset the sizes of the receive buffers to their
* actual value; on return they will be set to the
* read value.
*/
io.iov_len = sizeof(iovBuf);
msg.msg_controllen = sizeof(ctrBuf);
/*l
* Receive the waiting message.
*/
if (-1 == recvmsg(sfd, &msg, MSG_CMSG_CLOEXEC)) {
printVerbMsg("failed datagram read on socket");
/*l
* Verify that the message's control buffer contains
* a file descriptor.
*/
} else if ( NULL != (ctrMsg = CMSG_FIRSTHDR(&msg))
&& CMSG_LEN(sizeof(int)) == ctrMsg->cmsg_len
&& SOL_SOCKET == ctrMsg->cmsg_level
&& SCM_RIGHTS == ctrMsg->cmsg_type) {
/*l
* Unpack the file descriptor.
*/
memmove(&cfd, CMSG_DATA(ctrMsg), sizeof(cfd));
printVerbMsg("Received fd %d from client type %c%c%c",
cfd,
((char *)msg.msg_iov->iov_base)[0],
((char *)msg.msg_iov->iov_base)[1],
((char *)msg.msg_iov->iov_base)[2]);
/*l
* Process the incoming file descriptor and add
* it to the epoll() list.
*/
welcomeClient(efd, cfd);
/*l
* Note but ignore incorrectly formed datagrams.
*/
} else {
printVerbMsg("could not extract file descriptor "
"from client's datagram");
}
/*l
* The epoll() event is on one of the file descriptors
* shared with a client, process it.
*/
} else {
processClientEvent(efd, &epEvs[nfds]);
}
} while (nfds);
/*l
* If something happened to our socket break the epoll_wait()
* loop.
*/
if (nfds)
break;
}
}
/*l
* An error occurred, cleanup.
*/
if (-1 != efd)
close(efd);
if (-1 != sfd)
close(sfd);
return -1;
}
起初我尝试使用 eventfd() 而不是 pipe() 但 eventfd 文件描述符表示对象而不是连接,因此在客户端代码中关闭 fd 不会在 reaper 中产生 EPOLLHUP。如果有人为此知道 pipe() 的更好替代方法,请告诉我!
为了完整起见,这里是用于构造抽象地址的#defines:
/*d
* server abstract address.
*/
#define EPLS_SRV_NAM "_abssSrv"
#define EPLS_SRV_VER "0.0.1"
#define EPLS_SRV_ADDR EPLS_SRV_NAM "." EPLS_SRV_NAM
#define EPLS_MSG_LEN 3
#define EPLS_MAX_EPEVS 32
就是这样,希望对大家有用。
此问题类似于 How to call a function on a thread's creation and exit? but more specific. In another multi-process shared memory project I used a combination of an __attribute__((constructor)) labeled library init routine, lazy initialisation for each thread, and robust futexes,以确保资源不会泄漏到共享内存中,即使系统管理员选择 SIGKILL 使用它的进程之一。然而,APIs 中的 futexes way 对于我当前的项目来说太重量级了,甚至是一些绕过一些惰性初始化的指令也是我宁愿避免的。库 APIs 将在几个进程的几百个线程中被调用数万亿次(每个 API 只是几百条指令。)
我猜答案是否定的,但由于我花了几个小时寻找并没有找到明确的答案,我想我会在这里问,然后下一个寻找简单答案的人将能够更快找到它。
我的目标很简单:当线程在多个进程中异步创建时执行一些每线程初始化,并在线程被异步销毁时在某个时刻稳健地执行一些清理。不必立即发生,它最终会发生。
一些参与批判性思维的假设想法:从 __attribute__((constructor)) 标记的库 init func 调用的假设 pthread_atclone() 将满足第一个条件。以及对 futex()es 的扩展,以添加一个带有每个线程 futex_adj 值的类似 semop 的操作,如果 do_exit() 中的值非零,则会导致设置 FUTEX_OWNER_DIED对于 futex "semaphore" 允许在下次触摸 futex 时进行清理。
好吧,首先,您应该记录库用户不应该以这样一种方式异步终止线程,即他们不会明确释放属于您的库的资源(关闭句柄,无论如何),TBH,只是终止线程在进程终止之前是个坏主意。
更难检测整个进程在使用您的库时是否被 SIGKILLed。我目前最好的猜测是,所有希望使用您的库的进程都必须先登录,以便可以将它们的 pid 添加到容器中。使用在您的 lib 初始化时启动的线程,轮询已用 kill(pid,0) 消失的 pid,并进行任何适当的清理。它不是很令人满意,(我讨厌轮询),但我没有看到任何不是非常混乱的替代方案:(
经过研究和实验,据我所知,我提出了最新的 "best practice"。如果谁知道更好的,请评论!
对于第一部分,每线程初始化,我无法想出任何替代直接延迟初始化的方法。但是,我确实决定将分支移动到调用者会稍微更有效一些,这样新堆栈框架中的流水线操作就不会立即面临实际上不必要的分支。所以不是这个:
__thread int tInf = 0;
void
threadDoSomething(void *data)
{
if (!tInf) {
_threadInitInfo(&tInf);
}
/*l
* do Something.
*/
}
这个:
__thread int tInf = 0;
#define threadDoSomething(data) (((!tInf)?_threadInitInfo(&tInf):0), \
_threadDoSomething((data)))
void
_threadDoSomething(void *data)
{
/*l
* do Something.
*/
}
对这种欢迎的(不可否认的)有用性的评论!
对于第二部分,无论多么异步,当线程死机时都稳健地执行一些清理,我找不到比在文件描述符上进行收割过程 epoll_wait() 更好的解决方案读取通过抽象 UNIX 域套接字地址上的 sendmsg() 调用中的 SCM_RIGHTS 控制消息传递给它的开放管道的末端。听起来很复杂,但还不错,这是客户端:
/*m
* Client that registers a thread with a server who will do cleanup of a
* shared interprocess object even if the thread dies asynchronously.
*/
#include <sys/socket.h> // socket(), bind(), recvmsg()
#include <sys/syscall.h> // syscall()
#include <sys/un.h> // sockaddr_un
#include <stdint.h> // uint64_t
#include <fcntl.h> // O_CLOEXEC()
#include <malloc.h> // malloc()
#include <stdlib.h> // random()
#include <unistd.h> // close(), usleep()
#include <pthread.h> // pthread_create()
#include <tsteplsrv.h> // Our API.
char iovBuf[] = "SP1"; // 3 char buf to send client type
__thread pid_t cliTid = 0; // per-thread copy of self's Thread ID
/*f
* initClient() is called when we realise we need to lazily initialise
* our thread based on cliTid being zero.
*/
void *
initClient(void *ptr)
{
struct sockaddr_un svAddr;
struct msghdr msg;
struct iovec io;
struct cmsghdr *ctrMsg;
uint64_t ltid; // local 8-byte copy of the tid
int pfds[2], // two fds of our pipe
sfd; // socket fd
/*s
* This union is necessary to ensure that the buffer is aligned such that
* we can read cmsg_{len,level,type} from the cmsghdr without causing an
* alignment fault (SIGBUS.)
*/
union {
struct cmsghdr hdr;
char buf[CMSG_SPACE(sizeof(int))];
} ctrBuf;
pfds[0] = pfds[1] = sfd = -1;
/*l
* Get our Thread ID.
*/
ltid = (uint64_t)(cliTid = syscall(SYS_gettid));
/*l
* Set up an abstract unix domain socket address.
*/
svAddr.sun_family = AF_UNIX;
svAddr.sun_path[0] = '[=12=]';
strcpy(&svAddr.sun_path[1], EPLS_SRV_ADDR);
/*l
* Set up a socket datagram send buffer.
*/
io.iov_base = iovBuf;
io.iov_len = sizeof(iovBuf);
msg.msg_iov = &io;
msg.msg_iovlen = 1;
msg.msg_control = ctrBuf.buf;
msg.msg_controllen = sizeof(ctrBuf);
msg.msg_name = (struct sockaddr *)&svAddr,
msg.msg_namelen = (&svAddr.sun_path[0] - (char *)&svAddr)
+ 1
+ sizeof(EPLS_SRV_ADDR);
/*l
* Set up the control message header to indicate we are sharing a file
* descriptor.
*/
ctrMsg = CMSG_FIRSTHDR(&msg);
ctrMsg->cmsg_len = CMSG_LEN(sizeof(int));
ctrMsg->cmsg_level = SOL_SOCKET;
ctrMsg->cmsg_type = SCM_RIGHTS;
/*l
* Create file descriptors with pipe().
*/
if (-1 == pipe(pfds)) {
printErrMsg("TID: %d pipe() failed", cliTid);
} else {
/*l
* Write our tid to the pipe.
*/
memmove(CMSG_DATA(ctrMsg), &pfds[0], sizeof(int));
if (-1 == write(pfds[1], <id, sizeof(uint64_t))) {
printErrMsg("TID: %d write() failed", cliTid);
} if (-1 == (sfd = socket(AF_UNIX, SOCK_DGRAM, 0))) {
printErrMsg("TID: %d socket() failed", cliTid);
} else if (-1 == sendmsg(sfd, &msg, 0)) {
printErrMsg("TID: %d sendmsg() failed", cliTid);
} else {
printVerbMsg("TID: %d sent write fd %d to server kept read fd %d",
cliTid,
pfds[0],
pfds[1]);
/*l
* Close the read end of the pipe, the server has it now.
*/
close(pfds[0]);
pfds[0] = -1;
}
}
if (-1 != pfds[1]) close(pfds[1]);
if (-1 != pfds[0]) close(pfds[0]);
if (-1 != sfd) close(sfd);
return (void *)0;
}
以及死神的代码:
/*m
* Abstract datagram socket listening for FD's from clients.
*/
#include <sys/socket.h> // socket(), bind(), recvmsg()
#include <sys/epoll.h> // epoll_{create,wait}()
#include <sys/un.h> // sockaddr_un
#include <malloc.h> // malloc()
#include <unistd.h> // close()
#include <tsteplsrv.h> // Our API.
/*s
* socket datagram structs for receiving structured messages used to transfer
* fds from our clients.
*/
struct msghdr msg = { 0 };
struct iovec io = { 0 };
char iovBuf[EPLS_MSG_LEN]; // 3 char buf to receive client type
/*s
* This union is necessary to ensure that the buffer is aligned such that
* we can read cmsg_{len,level,type} from the cmsghdr without causing an
* alignment fault (SIGBUS.)
*/
union {
struct cmsghdr hdr;
char buf[CMSG_SPACE(sizeof(int))];
} ctrBuf;
typedef struct _tidFd_t {
struct _tidFd_t *next;
pid_t tid;
int fd;
} tidFd_t;
tidFd_t *tidFdLst = (tidFd_t *)0;
/*f
* Perform some handshaking with a new client and add the file descriptor
* it shared with us to the epoll set.
*/
static void
welcomeClient(int efd, int cfd)
{
uint64_t tid;
tidFd_t *tfd;
struct epoll_event epEv;
tfd = (tidFd_t *)-1;
/*l
* The fd is a pipe and should be readable, and should contain the
* tid of the client.
*/
if (-1 != read(cfd, &tid, sizeof(tid)) && (tfd = malloc(sizeof(*tfd)))) {
tfd->fd = cfd;
tfd->tid = (pid_t)tid;
tfd->next = tidFdLst;
/*l
* Single threaded process, no race condition here.
*/
tidFdLst = tfd;
/*l
* Add the fd to the epoll() set so that we will be woken up with
* an error if the thread dies.
*/
epEv.events = EPOLLIN;
epEv.data.fd = cfd;
if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, cfd, &epEv)) {
printErrMsg("TID: %ld Could not register fd %d with epoll set",
tid,
cfd);
} else {
printVerbMsg("TID: %ld Registered fd %d with epoll set", tid, cfd);
}
/*l
* Couldn't allocate memory for the new client.
*/
} else if (!tfd) {
printErrMsg("Could not allocate memory for new client");
/*l
* Could not read from the eventfd() file descriptor.
*/
} else {
printErrMsg("Could not read from client file descriptor");
}
}
/*f
* Perform some handshaking with a new client and add the file descriptor
* it shared with us to the epoll set.
*/
static void
processClientEvent(int efd, struct epoll_event *epEv)
{
tidFd_t *tfd, **bLnk;
/*l
* Walk the list of per-tid fd structs.
*/
for (bLnk = &tidFdLst; (tfd = *bLnk); bLnk = &tfd->next)
if (tfd->fd == epEv->data.fd)
break;
if (!tfd) {
printErrMsg("client file descriptor %d not found on the tfd list!",
epEv->data.fd);
/*l
* If we received an EPOLLHUP on the fd, cleanup.
*/
} else if (epEv->events & EPOLLHUP) {
/*l
* Try to remove the tid's pipe fd from the epoll set.
*/
if (-1 == epoll_ctl(efd, EPOLL_CTL_DEL, epEv->data.fd, epEv)) {
printErrMsg("couldn't delete epoll for tid %d", tfd->tid);
/*l
* Do tid cleanup here.
*/
} else {
printVerbMsg("TID: %d closing fd: %d", tfd->tid, epEv->data.fd);
close(epEv->data.fd);
/*l
* Remove the per-tid struct from the list and free it.
*/
*bLnk = tfd->next;
free(tfd);
}
} else {
printVerbMsg("TID: %d Received unexpected epoll event %d",
tfd->tid,
epEv->events);
}
}
/*f
* Create and listen on a datagram socket for eventfd() file descriptors
* from clients.
*/
int
main(int argc, char *argv[])
{
struct sockaddr_un svAddr;
struct cmsghdr *ctrMsg;
struct epoll_event *epEv,
epEvs[EPLS_MAX_EPEVS];
int sfd, efd, cfd, nfds;
sfd = efd = -1;
/*l
* Set up an abstract unix domain socket address.
*/
svAddr.sun_family = AF_UNIX;
svAddr.sun_path[0] = '[=13=]';
strcpy(&svAddr.sun_path[1], EPLS_SRV_ADDR);
/*l
* Set up a socket datagram receive buffer.
*/
io.iov_base = iovBuf; // 3-char buffer to ID client type
io.iov_len = sizeof(iovBuf);
msg.msg_name = (char *)0; // No need for the client addr
msg.msg_namelen = 0;
msg.msg_iov = &io; // single IO vector in the S/G array
msg.msg_iovlen = 1;
msg.msg_control = ctrBuf.buf; // Control message buffer
msg.msg_controllen = sizeof(ctrBuf);
/*l
* Set up an epoll event.
*/
epEv = &epEvs[0];
epEv->events = EPOLLIN;
/*l
* Create a socket to receive datagrams on and register the socket
* with our epoll event.
*/
if (-1 == (epEv->data.fd = sfd = socket(AF_UNIX, SOCK_DGRAM, 0))) {
printErrMsg("socket creation failed");
/*l
* Bind to the abstract address. The pointer math is to portably
* handle weird structure packing _just_in_case_.
*/
} else if (-1 == bind(sfd,
(struct sockaddr *)&svAddr,
(&svAddr.sun_path[0] - (char *)&svAddr)
+ 1
+ sizeof(EPLS_SRV_ADDR))) {
printErrMsg("could not bind address: %s", &svAddr.sun_path[1]);
/*l
* Create an epoll interface. Set CLOEXEC for tidiness in case a thread
* in the server fork()s and exec()s.
*/
} else if (-1 == (efd = epoll_create1(EPOLL_CLOEXEC))) {
printErrMsg("could not create epoll instance");
/*l
* Add our socket fd to the epoll instance.
*/
} else if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, sfd, epEv)) {
printErrMsg("could not add socket to epoll instance");
/*l
* Loop receiving events on our epoll instance.
*/
} else {
printVerbMsg("server listening on abstract address: %s",
&svAddr.sun_path[1]);
/*l
* Loop forever listening for events on the fds we are interested
* in.
*/
while (-1 != (nfds = epoll_wait(efd, epEvs, EPLS_MAX_EPEVS, -1))) {
/*l
* For each fd with an event, figure out what's up!
*/
do {
/*l
* Transform nfds from a count to an index.
*/
--nfds;
/*l
* If the fd with an event is the listening socket a client
* is trying to send us their eventfd() file descriptor.
*/
if (sfd == epEvs[nfds].data.fd) {
if (EPOLLIN != epEvs[nfds].events) {
printErrMsg("unexpected condition on socket: %d",
epEvs[nfds].events);
nfds = -1;
break;
}
/*l
* Reset the sizes of the receive buffers to their
* actual value; on return they will be set to the
* read value.
*/
io.iov_len = sizeof(iovBuf);
msg.msg_controllen = sizeof(ctrBuf);
/*l
* Receive the waiting message.
*/
if (-1 == recvmsg(sfd, &msg, MSG_CMSG_CLOEXEC)) {
printVerbMsg("failed datagram read on socket");
/*l
* Verify that the message's control buffer contains
* a file descriptor.
*/
} else if ( NULL != (ctrMsg = CMSG_FIRSTHDR(&msg))
&& CMSG_LEN(sizeof(int)) == ctrMsg->cmsg_len
&& SOL_SOCKET == ctrMsg->cmsg_level
&& SCM_RIGHTS == ctrMsg->cmsg_type) {
/*l
* Unpack the file descriptor.
*/
memmove(&cfd, CMSG_DATA(ctrMsg), sizeof(cfd));
printVerbMsg("Received fd %d from client type %c%c%c",
cfd,
((char *)msg.msg_iov->iov_base)[0],
((char *)msg.msg_iov->iov_base)[1],
((char *)msg.msg_iov->iov_base)[2]);
/*l
* Process the incoming file descriptor and add
* it to the epoll() list.
*/
welcomeClient(efd, cfd);
/*l
* Note but ignore incorrectly formed datagrams.
*/
} else {
printVerbMsg("could not extract file descriptor "
"from client's datagram");
}
/*l
* The epoll() event is on one of the file descriptors
* shared with a client, process it.
*/
} else {
processClientEvent(efd, &epEvs[nfds]);
}
} while (nfds);
/*l
* If something happened to our socket break the epoll_wait()
* loop.
*/
if (nfds)
break;
}
}
/*l
* An error occurred, cleanup.
*/
if (-1 != efd)
close(efd);
if (-1 != sfd)
close(sfd);
return -1;
}
起初我尝试使用 eventfd() 而不是 pipe() 但 eventfd 文件描述符表示对象而不是连接,因此在客户端代码中关闭 fd 不会在 reaper 中产生 EPOLLHUP。如果有人为此知道 pipe() 的更好替代方法,请告诉我!
为了完整起见,这里是用于构造抽象地址的#defines:
/*d
* server abstract address.
*/
#define EPLS_SRV_NAM "_abssSrv"
#define EPLS_SRV_VER "0.0.1"
#define EPLS_SRV_ADDR EPLS_SRV_NAM "." EPLS_SRV_NAM
#define EPLS_MSG_LEN 3
#define EPLS_MAX_EPEVS 32
就是这样,希望对大家有用。