线程间共享同一个epoll文件描述符可以吗?
Is it OK to share the same epoll file descriptor among threads?
多个线程共享同一个epoll fd(不是socket fd)安全吗?如果是这样,每个线程是否必须将自己的事件数组传递给 epoll_wait(2)
或者他们可以共享它吗?
例如
void *thread_func(void *thread_args) {
// extract socket_fd, epoll_fd, &event, &events_array from
// thread_args
// epoll_wait() using epoll_fd and events_array received from main
// now all threads would be using same epoll_fd and events array
}
void main( void ) {
// create and bind to socket
// create events_fd
// allocate memory for events array
// subscribe to events EPOLLIN and EPOLLET
// pack the socket_fd, epoll_fd, &events, &events_array into
// thread_args struct.
// create multiple threads and pass thread_func and
// same thread_args to all threads
}
还是这样比较好:
void *thread_func(void *socket_fd) {
// create events_fd
// allocate memory for events array
// subscribe to events EPOLLIN and EPOLLET
// epoll_wait using own epoll_fd and events_array
// now all threads would have a separate epoll_fd with
// events populated on its own array
}
void main(void) {
// create and bind to socket
//create multiple threads and pass thread_func and socket_fd to
// all threads
}
是否有一个很好的例子说明如何在 C 中执行此操作?我在示例中看到 运行 main()
中的事件循环,并在检测到事件时生成一个新线程来处理请求。我想要做的是在程序开始时创建特定数量的线程,并让每个线程 运行 进入事件循环并处理请求。
Is it safe to share the same Epoll fd (not socket fd) among several
threads.
是的,它是安全的 - epoll(7)
接口是线程安全的 - 但这样做时你应该小心,你至少应该使用 EPOLLET
(边缘触发模式,而不是到默认级别触发)以避免其他线程中的虚假唤醒。这是因为当有新事件可供处理时,级别触发模式将唤醒每个线程。由于只有一个线程会处理它,这会不必要地唤醒大多数线程。
If shared epfd is used will each thread have to pass its own events
array or a shared events array to epoll_wait()
是的,您需要在每个线程上有一个单独的事件数组,否则您将遇到竞争条件并且可能会发生令人讨厌的事情。例如,您可能有一个线程仍在迭代由 epoll_wait(2)
编辑的事件 return 并在处理请求时突然另一个线程使用相同的数组调用 epoll_wait(2)
然后事件得到同时覆盖另一个线程正在读取它们。不好!您 绝对 需要为每个线程创建一个单独的数组。
假设每个线程都有一个单独的数组,两种可能性——等待同一个 epoll fd 或为每个线程有一个单独的 epoll fd——都同样有效,但请注意语义是不同的。使用全局共享的 epoll fd,每个线程等待来自 any client 的请求,因为 clients 都被添加到同一个 epoll fd。每个线程都有一个单独的 epoll fd,那么每个线程基本上负责客户端的一个子集(那些被该线程接受的客户端)。
这可能与您的系统无关,也可能会产生巨大的影响。例如,可能发生这样的情况:一个线程很不幸地得到了一组发出繁重和频繁请求的高级用户,导致该线程超负荷工作,而其他具有不那么积极的客户端的线程几乎处于空闲状态。那不是不公平吗?另一方面,也许你希望只有一些线程处理特定的 class 用户,在这种情况下,在每个线程上使用不同的 epoll fds 可能是有意义的。像往常一样,您需要考虑两种可能性,权衡取舍,考虑您的具体问题,然后做出决定。
下面是一个使用全局共享的epoll fd的例子。我原本不打算做所有这些,但事情接二连三,嗯,这很有趣,我认为它可以帮助你开始。它是一个监听端口 3000 的回显服务器,有一个 20 个线程池,使用 epoll 来同时接受新客户端和服务请求。
#include <stdio.h>
#include <stdlib.h>
#include <inttypes.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#define SERVERPORT 3000
#define SERVERBACKLOG 10
#define THREADSNO 20
#define EVENTS_BUFF_SZ 256
static int serversock;
static int epoll_fd;
static pthread_t threads[THREADSNO];
int accept_new_client(void) {
int clientsock;
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
if ((clientsock = accept(serversock, (struct sockaddr *) &addr, &addrlen)) < 0) {
return -1;
}
char ip_buff[INET_ADDRSTRLEN+1];
if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
close(clientsock);
return -1;
}
printf("*** [%p] Client connected from %s:%" PRIu16 "\n", (void *) pthread_self(),
ip_buff, ntohs(addr.sin_port));
struct epoll_event epevent;
epevent.events = EPOLLIN | EPOLLET;
epevent.data.fd = clientsock;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientsock, &epevent) < 0) {
perror("epoll_ctl(2) failed attempting to add new client");
close(clientsock);
return -1;
}
return 0;
}
int handle_request(int clientfd) {
char readbuff[512];
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
ssize_t n;
if ((n = recv(clientfd, readbuff, sizeof(readbuff)-1, 0)) < 0) {
return -1;
}
if (n == 0) {
return 0;
}
readbuff[n] = '[=10=]';
if (getpeername(clientfd, (struct sockaddr *) &addr, &addrlen) < 0) {
return -1;
}
char ip_buff[INET_ADDRSTRLEN+1];
if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
return -1;
}
printf("*** [%p] [%s:%" PRIu16 "] -> server: %s", (void *) pthread_self(),
ip_buff, ntohs(addr.sin_port), readbuff);
ssize_t sent;
if ((sent = send(clientfd, readbuff, n, 0)) < 0) {
return -1;
}
readbuff[sent] = '[=10=]';
printf("*** [%p] server -> [%s:%" PRIu16 "]: %s", (void *) pthread_self(),
ip_buff, ntohs(addr.sin_port), readbuff);
return 0;
}
void *worker_thr(void *args) {
struct epoll_event *events = malloc(sizeof(*events)*EVENTS_BUFF_SZ);
if (events == NULL) {
perror("malloc(3) failed when attempting to allocate events buffer");
pthread_exit(NULL);
}
int events_cnt;
while ((events_cnt = epoll_wait(epoll_fd, events, EVENTS_BUFF_SZ, -1)) > 0) {
int i;
for (i = 0; i < events_cnt; i++) {
assert(events[i].events & EPOLLIN);
if (events[i].data.fd == serversock) {
if (accept_new_client() == -1) {
fprintf(stderr, "Error accepting new client: %s\n",
strerror(errno));
}
} else {
if (handle_request(events[i].data.fd) == -1) {
fprintf(stderr, "Error handling request: %s\n",
strerror(errno));
}
}
}
}
if (events_cnt == 0) {
fprintf(stderr, "epoll_wait(2) returned 0, but timeout was not specified...?");
} else {
perror("epoll_wait(2) error");
}
free(events);
return NULL;
}
int main(void) {
if ((serversock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
perror("socket(2) failed");
exit(EXIT_FAILURE);
}
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(SERVERPORT);
serveraddr.sin_addr.s_addr = INADDR_ANY;
if (bind(serversock, (const struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) {
perror("bind(2) failed");
exit(EXIT_FAILURE);
}
if (listen(serversock, SERVERBACKLOG) < 0) {
perror("listen(2) failed");
exit(EXIT_FAILURE);
}
if ((epoll_fd = epoll_create(1)) < 0) {
perror("epoll_create(2) failed");
exit(EXIT_FAILURE);
}
struct epoll_event epevent;
epevent.events = EPOLLIN | EPOLLET;
epevent.data.fd = serversock;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serversock, &epevent) < 0) {
perror("epoll_ctl(2) failed on main server socket");
exit(EXIT_FAILURE);
}
int i;
for (i = 0; i < THREADSNO; i++) {
if (pthread_create(&threads[i], NULL, worker_thr, NULL) < 0) {
perror("pthread_create(3) failed");
exit(EXIT_FAILURE);
}
}
/* main thread also contributes as worker thread */
worker_thr(NULL);
return 0;
}
一些注意事项:
main()
应该 return int
,而不是 void
(如您在示例中所示)
- 始终处理错误 return 代码。忽略它们是很常见的,当事情破裂时,很难知道发生了什么。
- 代码假设没有请求大于 511 字节(如
handle_request()
中的缓冲区大小所示)。如果请求大于此值,则可能某些数据会在套接字中保留很长时间,因为 epoll_wait(2)
不会报告它,直到该文件描述符上发生新事件(因为我们正在使用EPOLLET
)。在最坏的情况下,客户端可能永远不会真正发送任何新数据,永远等待回复。
- 为每个请求打印线程标识符的代码假定
pthread_t
是一个不透明的指针类型。的确,pthread_t
在Linux中是指针类型,但在其他平台可能是整数类型,所以这个是不可移植的。然而,这可能不是什么大问题,因为 epoll 是 Linux 特定的,所以代码无论如何都不可移植。
- 它假定当线程仍在为来自该客户端的请求提供服务时,没有来自同一客户端的其他请求到达。如果同时有一个新请求到达并且另一个线程开始为它服务,我们就会出现竞争条件,客户端不一定会按照他发送它们的相同顺序接收回显消息(但是,
write(2)
是原子的,所以虽然回复可能乱七八糟,就不穿插了)。
多个线程共享同一个epoll fd(不是socket fd)安全吗?如果是这样,每个线程是否必须将自己的事件数组传递给 epoll_wait(2)
或者他们可以共享它吗?
例如
void *thread_func(void *thread_args) {
// extract socket_fd, epoll_fd, &event, &events_array from
// thread_args
// epoll_wait() using epoll_fd and events_array received from main
// now all threads would be using same epoll_fd and events array
}
void main( void ) {
// create and bind to socket
// create events_fd
// allocate memory for events array
// subscribe to events EPOLLIN and EPOLLET
// pack the socket_fd, epoll_fd, &events, &events_array into
// thread_args struct.
// create multiple threads and pass thread_func and
// same thread_args to all threads
}
还是这样比较好:
void *thread_func(void *socket_fd) {
// create events_fd
// allocate memory for events array
// subscribe to events EPOLLIN and EPOLLET
// epoll_wait using own epoll_fd and events_array
// now all threads would have a separate epoll_fd with
// events populated on its own array
}
void main(void) {
// create and bind to socket
//create multiple threads and pass thread_func and socket_fd to
// all threads
}
是否有一个很好的例子说明如何在 C 中执行此操作?我在示例中看到 运行 main()
中的事件循环,并在检测到事件时生成一个新线程来处理请求。我想要做的是在程序开始时创建特定数量的线程,并让每个线程 运行 进入事件循环并处理请求。
Is it safe to share the same Epoll fd (not socket fd) among several threads.
是的,它是安全的 - epoll(7)
接口是线程安全的 - 但这样做时你应该小心,你至少应该使用 EPOLLET
(边缘触发模式,而不是到默认级别触发)以避免其他线程中的虚假唤醒。这是因为当有新事件可供处理时,级别触发模式将唤醒每个线程。由于只有一个线程会处理它,这会不必要地唤醒大多数线程。
If shared epfd is used will each thread have to pass its own events array or a shared events array to epoll_wait()
是的,您需要在每个线程上有一个单独的事件数组,否则您将遇到竞争条件并且可能会发生令人讨厌的事情。例如,您可能有一个线程仍在迭代由 epoll_wait(2)
编辑的事件 return 并在处理请求时突然另一个线程使用相同的数组调用 epoll_wait(2)
然后事件得到同时覆盖另一个线程正在读取它们。不好!您 绝对 需要为每个线程创建一个单独的数组。
假设每个线程都有一个单独的数组,两种可能性——等待同一个 epoll fd 或为每个线程有一个单独的 epoll fd——都同样有效,但请注意语义是不同的。使用全局共享的 epoll fd,每个线程等待来自 any client 的请求,因为 clients 都被添加到同一个 epoll fd。每个线程都有一个单独的 epoll fd,那么每个线程基本上负责客户端的一个子集(那些被该线程接受的客户端)。
这可能与您的系统无关,也可能会产生巨大的影响。例如,可能发生这样的情况:一个线程很不幸地得到了一组发出繁重和频繁请求的高级用户,导致该线程超负荷工作,而其他具有不那么积极的客户端的线程几乎处于空闲状态。那不是不公平吗?另一方面,也许你希望只有一些线程处理特定的 class 用户,在这种情况下,在每个线程上使用不同的 epoll fds 可能是有意义的。像往常一样,您需要考虑两种可能性,权衡取舍,考虑您的具体问题,然后做出决定。
下面是一个使用全局共享的epoll fd的例子。我原本不打算做所有这些,但事情接二连三,嗯,这很有趣,我认为它可以帮助你开始。它是一个监听端口 3000 的回显服务器,有一个 20 个线程池,使用 epoll 来同时接受新客户端和服务请求。
#include <stdio.h>
#include <stdlib.h>
#include <inttypes.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#define SERVERPORT 3000
#define SERVERBACKLOG 10
#define THREADSNO 20
#define EVENTS_BUFF_SZ 256
static int serversock;
static int epoll_fd;
static pthread_t threads[THREADSNO];
int accept_new_client(void) {
int clientsock;
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
if ((clientsock = accept(serversock, (struct sockaddr *) &addr, &addrlen)) < 0) {
return -1;
}
char ip_buff[INET_ADDRSTRLEN+1];
if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
close(clientsock);
return -1;
}
printf("*** [%p] Client connected from %s:%" PRIu16 "\n", (void *) pthread_self(),
ip_buff, ntohs(addr.sin_port));
struct epoll_event epevent;
epevent.events = EPOLLIN | EPOLLET;
epevent.data.fd = clientsock;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientsock, &epevent) < 0) {
perror("epoll_ctl(2) failed attempting to add new client");
close(clientsock);
return -1;
}
return 0;
}
int handle_request(int clientfd) {
char readbuff[512];
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
ssize_t n;
if ((n = recv(clientfd, readbuff, sizeof(readbuff)-1, 0)) < 0) {
return -1;
}
if (n == 0) {
return 0;
}
readbuff[n] = '[=10=]';
if (getpeername(clientfd, (struct sockaddr *) &addr, &addrlen) < 0) {
return -1;
}
char ip_buff[INET_ADDRSTRLEN+1];
if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
return -1;
}
printf("*** [%p] [%s:%" PRIu16 "] -> server: %s", (void *) pthread_self(),
ip_buff, ntohs(addr.sin_port), readbuff);
ssize_t sent;
if ((sent = send(clientfd, readbuff, n, 0)) < 0) {
return -1;
}
readbuff[sent] = '[=10=]';
printf("*** [%p] server -> [%s:%" PRIu16 "]: %s", (void *) pthread_self(),
ip_buff, ntohs(addr.sin_port), readbuff);
return 0;
}
void *worker_thr(void *args) {
struct epoll_event *events = malloc(sizeof(*events)*EVENTS_BUFF_SZ);
if (events == NULL) {
perror("malloc(3) failed when attempting to allocate events buffer");
pthread_exit(NULL);
}
int events_cnt;
while ((events_cnt = epoll_wait(epoll_fd, events, EVENTS_BUFF_SZ, -1)) > 0) {
int i;
for (i = 0; i < events_cnt; i++) {
assert(events[i].events & EPOLLIN);
if (events[i].data.fd == serversock) {
if (accept_new_client() == -1) {
fprintf(stderr, "Error accepting new client: %s\n",
strerror(errno));
}
} else {
if (handle_request(events[i].data.fd) == -1) {
fprintf(stderr, "Error handling request: %s\n",
strerror(errno));
}
}
}
}
if (events_cnt == 0) {
fprintf(stderr, "epoll_wait(2) returned 0, but timeout was not specified...?");
} else {
perror("epoll_wait(2) error");
}
free(events);
return NULL;
}
int main(void) {
if ((serversock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
perror("socket(2) failed");
exit(EXIT_FAILURE);
}
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(SERVERPORT);
serveraddr.sin_addr.s_addr = INADDR_ANY;
if (bind(serversock, (const struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) {
perror("bind(2) failed");
exit(EXIT_FAILURE);
}
if (listen(serversock, SERVERBACKLOG) < 0) {
perror("listen(2) failed");
exit(EXIT_FAILURE);
}
if ((epoll_fd = epoll_create(1)) < 0) {
perror("epoll_create(2) failed");
exit(EXIT_FAILURE);
}
struct epoll_event epevent;
epevent.events = EPOLLIN | EPOLLET;
epevent.data.fd = serversock;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serversock, &epevent) < 0) {
perror("epoll_ctl(2) failed on main server socket");
exit(EXIT_FAILURE);
}
int i;
for (i = 0; i < THREADSNO; i++) {
if (pthread_create(&threads[i], NULL, worker_thr, NULL) < 0) {
perror("pthread_create(3) failed");
exit(EXIT_FAILURE);
}
}
/* main thread also contributes as worker thread */
worker_thr(NULL);
return 0;
}
一些注意事项:
main()
应该 returnint
,而不是void
(如您在示例中所示)- 始终处理错误 return 代码。忽略它们是很常见的,当事情破裂时,很难知道发生了什么。
- 代码假设没有请求大于 511 字节(如
handle_request()
中的缓冲区大小所示)。如果请求大于此值,则可能某些数据会在套接字中保留很长时间,因为epoll_wait(2)
不会报告它,直到该文件描述符上发生新事件(因为我们正在使用EPOLLET
)。在最坏的情况下,客户端可能永远不会真正发送任何新数据,永远等待回复。 - 为每个请求打印线程标识符的代码假定
pthread_t
是一个不透明的指针类型。的确,pthread_t
在Linux中是指针类型,但在其他平台可能是整数类型,所以这个是不可移植的。然而,这可能不是什么大问题,因为 epoll 是 Linux 特定的,所以代码无论如何都不可移植。 - 它假定当线程仍在为来自该客户端的请求提供服务时,没有来自同一客户端的其他请求到达。如果同时有一个新请求到达并且另一个线程开始为它服务,我们就会出现竞争条件,客户端不一定会按照他发送它们的相同顺序接收回显消息(但是,
write(2)
是原子的,所以虽然回复可能乱七八糟,就不穿插了)。