C - epoll 无限期地报告 EPOLLIN 为 1-way closed socket

C - epoll reports EPOLLIN indefinitely for 1-way closed socket

我正在使用 C 中的 epoll 实现一个 HTTP 服务器。服务器解析请求并做出相应的响应。

当我发出简单的 curl 请求时一切正常(即没有并发)。

然而,当使用并发客户端并发出多个请求时,即使 ab 报告所有响应都成功(事实上,它们是成功的,因为报告的传输字节似乎是正确的),服务器保留最后一个1或2个client fd打开,循环到epoll_wait无限向最后一个client fd报告EPOLLIN。如果我在那个 FD 中尝试 recv,我总是得到 0,这意味着客户端关闭了连接。

我很好奇为什么这种情况只发生 一些 次而不总是发生,为什么只发生在最后一个剩余的客户端 fd 上。

代码位于https://github.com/agis-/rea/tree/epoll,但为了完整起见,我也将其粘贴在这里(稍后我将给出具体的重现步骤):

rea.c:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <err.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <signal.h>
#include <netdb.h>
#include <sys/epoll.h>
#include "rea.h"

fd_set rfds;
fd_set wfds;
Server *server;
Client *clients[MAX_CLIENTS];
int epfd;
int cheat;
int responds;
int requests;


int main(int argc, char *argv[])
{
    int status, i, fd, added;
    Client *c;

    if (argc != 2) {
        fprintf(stderr, "Usage: %s <port>\n", argv[0]);
        exit(EXIT_FAILURE);
    }

    setupAndListen(argv[1]);

    // epoll
    //
    struct epoll_event ev;
    struct epoll_event evs[MAX_EP_EVENTS];
    int nfds;
    epfd = epoll_create1(0);
    if (epfd == -1) {
        err(EXIT_FAILURE, "epoll_create error");
    }

    //deleteme
    int maxclients =0;

    ev.events = EPOLLIN;
    ev.data.ptr = server;

    status = epoll_ctl(epfd, EPOLL_CTL_ADD, server->fd, &ev);
    if (status == -1) {
        printf("%d\n", server->fd);
        err(EXIT_FAILURE, "epoll_ctl error");
    }

    while(1) {
        nfds = epoll_wait(epfd, evs, MAX_EP_EVENTS, -1);
        if (nfds == -1) {
            err(EXIT_FAILURE, "epoll_wait error");
        }
        printf("nfds: %d | on_message_complete: %d | responds: %d | request reads: %d\n", nfds, cheat, responds, requests);

        for (i = 0; i < nfds; i++) {
            if (((Server *)evs[i].data.ptr)->fd == server->fd) {
                fd = accept4(server->fd, server->addr->ai_addr,
                    &(server->addr->ai_addrlen), SOCK_NONBLOCK);
                if (fd < 0) {
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        continue;
                    }
                    err(EXIT_FAILURE, "Socket accept error");
                }

                added = 0;
                for (i = 0; i < MAX_CLIENTS; i++) {
                    if (clients[i] == 0) {
                        clients[i] = makeClient(fd);
                        added = 1;
                        break;
                    }
                }
                if (!added) {
                    fprintf(stderr, "Could not find room for client fd: %d\n", fd);
                }

                ev.events = EPOLLIN;
                ev.data.ptr = clients[i];
                status = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
                if (status == -1) {
                    printf("client fd: %d\n", fd);
                    err(EXIT_FAILURE, "epoll_ctl error");
                }

                printf("Accepted connection! (fd: %d)(%p)\n", clients[i]->fd, clients[i]);
                printf("clients after accept: ");
                for (i=0;i < MAX_CLIENTS; i++) {
                    if (clients[i]) {
                        printf("%d(%p) ", clients[i]->fd, clients[i]);
                    }
                }
                printf("\n");
            } else {
                c = (Client *)evs[i].data.ptr;

                if ((evs[i].events & EPOLLIN) && c->cstate == CONNECTED) {
                    c->ev_delivered += 1;
                    if (c->ev_delivered > 1) {
                        printf("2 EVENTS!\n");
                        sleep(1);
                        //exit(1);
                    }
                    readRequest(c);
                } else if ((evs[i].events & EPOLLOUT)) {
                    respond(c);
                } else {
                    if (evs[i].events & EPOLLIN) {
                        printf("foo\n");
                        printf("client EPOLLIN %d(%p) %d\n", c->fd, c);
                    }
                    if (evs[i].events & EPOLLOUT) {
                        printf("client EPOLLOUT %d(%p)\n", c->fd, c);
                    }
                    if (evs[i].events & EPOLLERR) {
                        printf("client EPOLLERR %d(%p)\n", c->fd, c);
                    }
                    if (evs[i].events & EPOLLHUP) {
                        printf("client EPOLLHUP %d(%p)\n", c->fd, c);
                    }
                    c->ev_delivered += 1;

                    if (c->ev_delivered > 1) {
                        printf("2 EVENTS!\n");
                        sleep(1);
                        //exit(1);
                    }

                    for (i=0; i<MAX_CLIENTS;i++) {
                        if (clients[i]) {
                            maxclients += 1;
                        }
                    }
                    printf("nfds: %d | on_message_complete: %d | responds: %d | request reads: %d | maxclients: %d | c->replied: %d\n", nfds, cheat, responds, requests, maxclients, c->replied);
                    printf("clients after accept: ");
                    for (i=0;i < MAX_CLIENTS; i++) {
                        if (clients[i]) {
                            printf("%d(%p) ", clients[i]->fd, clients[i]);
                        }
                    }
                    printf("\n");
                    //exit(1);
                }
            }
        }
    }
}


void setupAndListen(char *port)
{
    int status, fd;
    struct addrinfo hints;
    struct addrinfo *ai;

    server = (Server *)malloc(sizeof(Server));
    if (!server) {
        fprintf(stderr, "Couldn't allocate memory for starting the server\n");
        exit(EXIT_FAILURE);
    }

    memset(&hints, 0, sizeof(hints));
    hints.ai_family = AF_INET; /* Only IPv4 for now */
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE; /* Listen on all network addresses */

    if ((status = getaddrinfo(NULL, port, &hints, &ai)) != 0) {
        fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status));
        exit(EXIT_FAILURE);
    }

    /*
     * Normally only a single protocol exists to support a particular
     * socket [type, protocol family] combination, so we can skip specifying it
     */
    fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (fd < 0) {
        err(EXIT_FAILURE, "Socket creation error");
    }

    status = bind(fd, ai->ai_addr, ai->ai_addrlen);
    if (status != 0) {
        err(EXIT_FAILURE, "Socket bind error");
    }

    status = listen(fd, RECV_BACKLOG);
    if (status != 0) {
        err(EXIT_FAILURE, "Socket listen error");
    }

    server->fd = fd;
    server->addr = ai;

    setupSighandlers();

    printf("Listening on 0.0.0.0:%s ...\n", port);
}


int on_message_complete_cb(http_parser *p)
{
    struct epoll_event ev;
    Client *c = p->data;

    c->pstate = SUCCESS;
    ev.events = EPOLLOUT;
    ev.data.ptr = c;
    cheat += 1;

    if (epoll_ctl(epfd, EPOLL_CTL_MOD, c->fd, &ev) == -1) {
        err(EXIT_FAILURE, "epoll_ctl error");
    }
    printf("on msg complete called for client %d(%p)\n", c->fd, c);

    return 0;
}


void readRequest(Client *c)
{
    int nparsed, status;
    struct epoll_event ev;

    // TODO: remove me
    int i;

    requests += 1;

    status = recv(c->fd, c->buf, RECV_BUFFER, 0);

    if (status < 0 && errno != EAGAIN && errno != EWOULDBLOCK ) {
        // TODO: Leave this on until we work on the possible errors
        // from recv. In the future we should handle them.
        err(EXIT_FAILURE, "Message recv error (client: %d)\n", c->fd);
    } else {
        if (status == 0) {
            printf("Client %d(%p) closed the connection.\n", c->fd, c);
            printf("clients after readRequest: ");
            for (i=0;i < MAX_CLIENTS; i++) {
                if (clients[i]) {
                    printf("%d(%p) ", clients[i]->fd, clients[i]);
                }
            }
            printf("\n");

            c->cstate = DISCONNECTED;
        }

        nparsed = http_parser_execute(c->parser, c->parser_settings, c->buf, status);

        if (nparsed != status) {
            c->pstate = ERROR;

            ev.events = EPOLLOUT;
            ev.data.ptr = c;

            if (epoll_ctl(epfd, EPOLL_CTL_MOD, c->fd, &ev) == -1) {
                err(EXIT_FAILURE, "epoll_ctl error");
            }

            printf("Parse error (client %d): %s\n",
                    c->fd, http_errno_description(HTTP_PARSER_ERRNO(c->parser)));
        }
    }
}


void respond(Client *c)
{
    int status;
    int i;
    char *resp = "HTTP/1.1 200 OK\r\nContent-Length: 4\r\n\r\nCool\r\n\r\n";
    char *resp400 = "HTTP/1.1 400 Bad Request\r\n\r\n";
    printf("respond's client: %d(%p)\n", c->fd, c);

    responds += 1;

    printf("clients after respond: ");
    for (i=0;i < MAX_CLIENTS; i++) {
        if (clients[i]) {
            printf("%d(%p) ", clients[i]->fd, clients[i]);
        }
    }
    printf("\n");

    if (c->replied) {
        printf("ERROR: already replied");
        exit(1);
    }

    if (c->pstate == ERROR) {
        printf("ERROR: %d\n", c->fd);
        status = send(c->fd, resp400, strlen(resp400), 0);
        if (status == -1) {
            err(EXIT_FAILURE, "send error (client: %d)", c->fd);
        }
        c->replied = 1;
        status = epoll_ctl(epfd, EPOLL_CTL_DEL, c->fd, NULL);
        if (status == -1) {
            printf("%d\n", c->fd);
            err(EXIT_FAILURE, "epoll_ctl error");
        }
        closeClient(c);
    } else if (c->pstate == SUCCESS) {
        status = send(c->fd, resp, strlen(resp), 0);
        if (status == -1) {
            printf("ccc\n");
            err(EXIT_FAILURE, "send error! (client: %d)", c->fd);
        }
        c->replied = 1;
        status = epoll_ctl(epfd, EPOLL_CTL_DEL, c->fd, NULL);
        if (status == -1) {
            printf("%d\n", c->fd);
            err(EXIT_FAILURE, "epoll_ctl error");
        }
        closeClient(c);
    } else {
        printf("didn't respond at all to this client fd: %p\n", c);
        exit(1);
    }
}


Client *makeClient(int fd)
{
    http_parser_settings *settings = malloc(sizeof(http_parser_settings));
    http_parser *parser = malloc(sizeof(http_parser));

    http_parser_settings_init(settings);
    http_parser_init(parser, HTTP_REQUEST);

    settings->on_message_complete = on_message_complete_cb;

    Client *c = malloc(sizeof(Client));
    if (!c) {
        fprintf(stderr, "Couldn't allocate memory for connection %d\n", fd);
        exit(EXIT_FAILURE);
    }

    c->fd = fd;
    c->cstate = CONNECTED;
    c->pstate = IN_PROGRESS;
    c->replied = 0;
    c->ev_delivered = 0;
    memset(c->buf, 0, RECV_BUFFER);
    c->parser_settings = settings;
    c->parser = parser;
    c->parser->data = c;

    return c;
}


void closeClient(Client *c)
{
    int i, found;

    if (close(c->fd) < 0) {
        err(EXIT_FAILURE, "close(2) error");
    }

    found = 0;
    for (i = 0; i < MAX_CLIENTS; i++) {
        if (clients[i] && clients[i] == c) {
            clients[i] = 0;
            found = 1;
            break;
        }
    }

    if (found != 1) {
        err(EXIT_FAILURE, "Couldn't find client fd to close");
    }
    printf("clients after close %d(%p):", c->fd, c);
    for (i=0;i < MAX_CLIENTS; i++) {
        if (clients[i]) {
            printf("%d(%p)", clients[i]->fd, clients[i]);
        }
    }
    printf("\n");

    free(c);
}


void setupSighandlers(void)
{
    struct sigaction act;
    act.sa_handler = shutdownServer;

    int status = sigaction(SIGINT, &act, NULL);
    if (status != 0) {
        err(EXIT_FAILURE, "Error setting up signal handler\n");
    }
}


void shutdownServer(int sig)
{
    printf("\nShutting down...\n");
    int maxclients = 0;
    int i;

    int status = close(server->fd);
    if (status != 0) {
        err(EXIT_FAILURE, "Socket cleanup error");
    }

    freeaddrinfo(server->addr);
    for (i=0; i<MAX_CLIENTS;i++) {
        if (clients[i]) {
            maxclients += 1;
        }
    }
    printf("on_message_complete: %d | responds: %d | request reads: %d | connected clients: %d\n", cheat, responds, requests,maxclients);
    printf("Goodbye!\n");
    exit(EXIT_SUCCESS);
} 

复制步骤

  1. 首先,下载并运行服务器:

    $ git clone https://github.com/agis-/rea.git
    $ cd rea && make && ./rea 8005
    
  2. 然后从另一个终端尝试发出多个没有并发的请求:

    $ ab -n1000 -c1 http://0.0.0.0:8005/foo
    

    到目前为止一切正常,服务器报告没有活动的客户端文件描述符:

    clients after respond: 5(0x9e101e0) 
    clients after close 5(0x9e101e0):
                                     ^^^^^ no clients
    

    这也可以通过 运行 宁一个 lsof 来验证,并看到除了服务器的侦听套接字之外没有活动的 TCP 套接字。

  3. 然后尝试从 ab:

    发出多个请求
    $ ab -n1000 -c10 http://0.0.0.0:8005/foo
    

    你会看到这样的东西不断打印:

    clients after accept: 5(0x926aa40) 11(0x9271d50)
                          # ^^^^^^^^ clients are still around 
    nfds: 2 | on_message_complete: 6000 | responds: 6000 | request reads: 6002
    client EPOLLIN 5(0x926aa40) 6000
    2 EVENTS! # means we accepted 2 events for the same fd
    

    (如果您没有得到类似的输出并且一切正常,请再次尝试 运行ning ab 直到您得到类似的结果)。这意味着客户端 FD 5 收到了 EPOLLIN 超过 1 次。这也意味着即使我们总共响应了 6000 个请求(我们做了 6000 send(2)s),总共 read(2)s 其中 6002。这也意味着仍然有 2 个客户端 FD 在进程.

关于为什么会发生这种情况有什么想法吗?提前致谢!

如果 recv() returns 为零,则对等方肯定已关闭连接,您也应该关闭连接,并停止对其进行轮询。