为 epoll 线程重新准备文件描述符是否安全?

Is rearming file descriptors for epoll thread safe?

this question 我知道我可以在另一个线程阻塞 epoll_wait(2) 时调用 epoll_ctl(2)。不过我还有一个问题。

当使用带有 EPOLLONESHOT 标志的 epoll 时,只会触发一个事件,并且必须使用 epoll_ctl(2) 重新装备 fd。这是必要的,所以只有一个线程 将从 fd 读取并适当地处理结果。

以下是一个时间线,在一定程度上形象化了我假设的问题:

Thread1:                       Thread2:                  Kernel:
-----------------------------------------------------------------------
epoll_wait();
                                                         Receives chunk
dispatch chunk to thread 2
epoll_wait();                  Handle chunk
                               Still handle chunk        Receives chunk
                               Rearm fd for epoll
?

收到块后重新装备 fd 时,问号上会发生什么情况? epoll 会触发 EPOLLIN 事件,还是会无限期地阻塞,尽管套接字是可读的?我的架构是否合理?

您的体系结构是合理的,它会起作用:epoll 会将文件描述符标记为可读并触发 EPOLLIN 事件。

这方面的文档很少而且很微妙; man 7 epoll 的 Q/A 部分简要提到了这一点:

Q8 Does an operation on a file descriptor affect the already collected but not yet reported events?

A8 You can do two operations on an existing file descriptor. Remove would be meaningless for this case. Modify will reread available I/O.

您可以对现有文件描述符执行的两个操作(现有文件描述符是过去已添加到 epoll 集的文件描述符 - 这包括等待重新装备的文件描述符)是删除和修改。如联机帮助页所述,删除在这里没有意义,修改将重新评估文件描述符中的条件。

不过,没有什么能比得上真实世界的实验了。以下程序测试了这种边缘情况:

#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <assert.h>
#include <semaphore.h>
#include <sys/epoll.h>
#include <unistd.h>

static pthread_t tids[2];
static int epoll_fd;
static char input_buff[512];
static sem_t chunks_sem;

void *dispatcher(void *arg) {
    struct epoll_event epevent;

    while (1) {
        printf("Dispatcher waiting for more chunks\n");
        if (epoll_wait(epoll_fd, &epevent, 1, -1) < 0) {
            perror("epoll_wait(2) error");
            exit(EXIT_FAILURE);
        }

        ssize_t n;
        if ((n = read(STDIN_FILENO, input_buff, sizeof(input_buff)-1)) <= 0) {
            if (n < 0)
                perror("read(2) error");
            else
                fprintf(stderr, "stdin closed prematurely\n");
            exit(EXIT_FAILURE);
        }

        input_buff[n] = '[=10=]';
        sem_post(&chunks_sem);
    }

    return NULL;
}

void *consumer(void *arg) {
    sigset_t smask;
    sigemptyset(&smask);
    sigaddset(&smask, SIGUSR1);

    while (1) {
        sem_wait(&chunks_sem);
        printf("Consumer received chunk: %s", input_buff);
        /* Simulate some processing... */
        sleep(2);
        printf("Consumer finished processing chunk.\n");
        printf("Please send SIGUSR1 after sending more data to stdin\n");

        int signo;
        if (sigwait(&smask, &signo) < 0) {
            perror("sigwait(3) error");
            exit(EXIT_FAILURE);
        }

        assert(signo == SIGUSR1);

        struct epoll_event epevent;
        epevent.events = EPOLLIN | EPOLLONESHOT;
        epevent.data.fd = STDIN_FILENO;

        if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, STDIN_FILENO, &epevent) < 0) {
            perror("epoll_ctl(2) error when attempting to readd stdin");
            exit(EXIT_FAILURE);
        }

        printf("Readded stdin to epoll fd\n");
    }
}

int main(void) {

    sigset_t sigmask;
    sigfillset(&sigmask);
    if (pthread_sigmask(SIG_SETMASK, &sigmask, NULL) < 0) {
        perror("pthread_sigmask(3) error");
        exit(EXIT_FAILURE);
    }

    if ((epoll_fd = epoll_create(1)) < 0) {
        perror("epoll_create(2) error");
        exit(EXIT_FAILURE);
    }

    struct epoll_event epevent;
    epevent.events = EPOLLIN | EPOLLONESHOT;
    epevent.data.fd = STDIN_FILENO;

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &epevent) < 0) {
        perror("epoll_ctl(2) error");
        exit(EXIT_FAILURE);
    }

    if (sem_init(&chunks_sem, 0, 0) < 0) {
        perror("sem_init(3) error");
        exit(EXIT_FAILURE);
    }

    if (pthread_create(&tids[0], NULL, dispatcher, NULL) < 0) {
        perror("pthread_create(3) error on dispatcher");
        exit(EXIT_FAILURE);
    }

    if (pthread_create(&tids[1], NULL, consumer, NULL) < 0) {
        perror("pthread_create(3) error on consumer");
        exit(EXIT_FAILURE);
    }

    size_t i;
    for (i = 0; i < sizeof(tids)/sizeof(tids[0]); i++) {
        if (pthread_join(tids[i], NULL) < 0) {
            perror("pthread_join(3) error");
            exit(EXIT_FAILURE);
        }
    }

    return 0;
}

它的工作原理如下:调度程序线程将 stdin 添加到 epoll 集,然后使用 epoll_wait(2)stdin 获取可读的输入。当输入到达时,调度程序唤醒工作线程,该线程打印输入并通过休眠 2 秒来模拟一些处理时间。同时,调度程序返回主循环并再次阻塞 epoll_wait(2)

工作线程不会重新武装 stdin,直到您通过发送它 SIGUSR1 来通知它。所以,我们只是在 stdin 中写入更多内容,然后将 SIGUSR1 发送到进程。工作线程收到信号,然后才重新武装 stdin - 到那时已经可以读取,并且调度程序已经在等待 epoll_wait(2).

您可以从输出中看到调度程序已正确唤醒并且一切正常:

Dispatcher waiting for more chunks
testing 1 2 3 // Input
Dispatcher waiting for more chunks // Dispatcher notified worker and is waiting again
Consumer received chunk: testing 1 2 3
Consumer finished processing chunk.
Please send SIGUSR1 after sending more data to stdin
hello world // Input
Readded stdin to epoll fd // Rearm stdin; dispatcher is already waiting
Dispatcher waiting for more chunks // Dispatcher saw new input and is now waiting again
Consumer received chunk: hello world
Consumer finished processing chunk.
Please send SIGUSR1 after sending more data to stdin