从 FIFO 读取的程序是否有任何方法可以检测到写入同一 FIFO 的程序已将其关闭?

Is there any way for a program reading from a FIFO to detect that a program writing to the same FIFO has closed it?

在 C 中使用 epoll,有什么方法可以让从 FIFO 读取的程序检测到写入同一 FIFO 的程序已将其关闭?

我以为这会生成一个 EPOLLHUP 事件,但显然它不会。

示例: 我有一个 reader 和一个作家。编写器创建并打开三个 FIFO,向其中写入一些浮点数,完成后将其关闭。 readers 打开 FIFO 并监视它们,打印出浮点数,并且应该在编写器关闭它们时关闭 FIFO(通过检查 EPOLLHUP 事件)。这是行不通的, reader 只是停留在无限循环中,直到终止。下面的代码,以及我的 reader 和作者的日志。

Reader代码:

#define _XOPEN_SOURCE 700

#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <time.h>

#define POLL_TIMEOUT 1000 * 5

struct fifo
{
    char *path;
    FILE *fp;
    int fd;
    struct epoll_event ev;
};

#define dbg_fmt(str, ...) {\
    time_t curr_time = time(NULL);\
    struct tm *curr_tm = gmtime(&curr_time);\
    fprintf(stderr, "[%d:%d:%d] "str"\n", curr_tm->tm_hour, curr_tm->tm_min, curr_tm->tm_sec, __VA_ARGS__);\
}

#define dbg(str) {\
    time_t curr_time = time(NULL);\
    struct tm *curr_tm = gmtime(&curr_time);\
    fprintf(stderr, "[%d:%d:%d] %s\n", curr_tm->tm_hour, curr_tm->tm_min, curr_tm->tm_sec, str);\
}

int main(void)
{
    int num_fifos = 3;
    int num_active_fifos = num_fifos;

    struct fifo *fifos = malloc(sizeof(struct fifo) * num_fifos);

    int epollfd = epoll_create1(0);
    struct epoll_event *epollev = calloc(num_fifos, sizeof(struct epoll_event));

    for(int i=0; i<num_fifos; i++)
    {
        fifos[i].path = malloc(sizeof(char) * strlen("fifoX"));
        sprintf(fifos[i].path, "fifo%d", i);

        dbg_fmt("opening %s", fifos[i].path);
        fifos[i].fp = fopen(fifos[i].path, "r+");
        fifos[i].fd = fileno(fifos[i].fp);

        fifos[i].ev.data.ptr = (void *)&(fifos[i]);
        fifos[i].ev.events = EPOLLIN;

        dbg_fmt("registering %s with epoll", fifos[i].path);
        epoll_ctl(epollfd, EPOLL_CTL_ADD, fifos[i].fd, &(fifos[i].ev));     
    }

    dbg("entering poll loop");

    while(num_active_fifos > 0)
    {
        dbg("waiting for event...");
        int num_events = epoll_wait(epollfd, epollev, num_fifos, POLL_TIMEOUT);\

        for(int i=0; i<num_events; i++)
        {
            struct fifo *curr_fifo = epollev[i].data.ptr;
            dbg_fmt("recieved event from %s", curr_fifo->path);

            if(epollev[i].events & EPOLLIN)
            {
                float fl;
                fscanf(curr_fifo->fp, "%f", &fl);
                dbg_fmt("read %f from %s", fl, curr_fifo->path);
            }

            if(epollev[i].events & (EPOLLHUP|EPOLLRDHUP|EPOLLERR)) {
                dbg_fmt("closing %s", curr_fifo->path);
                epoll_ctl(epollfd, EPOLL_CTL_DEL,curr_fifo->fd, NULL);
                fclose(curr_fifo->fp);
                num_active_fifos--;
            }
        }
    }

    for(int i=0; i<num_fifos; i++) {
        free(fifos[i].path);
    }

    close(epollfd);
    free(epollev);

    free(fifos);

    dbg("done");
    return 0;
}

作者代码:

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
#include <time.h>
#include <string.h>


#define dbg_fmt(str, ...) {\
    time_t curr_time = time(NULL);\
    struct tm *curr_tm = gmtime(&curr_time);\
    fprintf(stderr, "[%d:%d:%d] "str"\n", curr_tm->tm_hour, curr_tm->tm_min, curr_tm->tm_sec, __VA_ARGS__);\
}

#define dbg(str) {\
    time_t curr_time = time(NULL);\
    struct tm *curr_tm = gmtime(&curr_time);\
    fprintf(stderr, "[%d:%d:%d] %s\n", curr_tm->tm_hour, curr_tm->tm_min, curr_tm->tm_sec, str);\
}

int main(int argc, char **argv) {

    mkfifo("fifo0",0755);
    mkfifo("fifo1",0755);
    mkfifo("fifo2",0755);

    int fifo0 = open("fifo0",O_WRONLY);
    int fifo1 = open("fifo1",O_WRONLY);
    int fifo2 = open("fifo2",O_WRONLY);

    dbg("made FIFOs, sleeping for 5s");
    sleep(5);

    dbg("writing one value to each fifo (1.2, 0.5, -1.3)");

    write(fifo0, "1.2 ", strlen("1.2 "));
    write(fifo1, "0.5 ", strlen("0.5 "));
    write(fifo2, "-1.3 ", strlen("-1.3 "));

    dbg("sleeping for 5s");
    sleep(5);

    dbg("closing fifo2");
    close(fifo2);

    dbg("sleeping for 5s");
    sleep(5);

    dbg("writing two remaining values (-1.0 and 3.2) to fifo1 & closing it");

    write(fifo1, "-1.0 ", strlen("-1.0 "));
    write(fifo1, "3.2 ", strlen("3.2 "));
    close(fifo1);

    dbg("sleeping for 5s");
    sleep(5);

    dbg("server: writing remaining value (-2.5) to fifo0 & closing it");

    write(fifo0, "-2.5 ", strlen("-2.5 "));
    close(fifo0);

    dbg("done");
}

Reader 日志:

[18:42:3] opening fifo0
[18:42:3] registering fifo0 with epoll
[18:42:3] opening fifo1
[18:42:3] registering fifo1 with epoll
[18:42:3] opening fifo2
[18:42:3] registering fifo2 with epoll
[18:42:3] entering poll loop
[18:42:3] waiting for event...
[18:42:8] waiting for event...
[18:42:11] recieved event from fifo0
[18:42:11] read 1.200000 from fifo0
[18:42:11] waiting for event...
[18:42:11] recieved event from fifo1
[18:42:11] read 0.500000 from fifo1
[18:42:11] recieved event from fifo2
[18:42:11] read -1.300000 from fifo2
[18:42:11] waiting for event...
[18:42:16] waiting for event...
[18:42:21] recieved event from fifo1
[18:42:21] read -1.000000 from fifo1
[18:42:21] waiting for event...
[18:42:26] recieved event from fifo0
[18:42:26] read -2.500000 from fifo0
[18:42:26] waiting for event...
[18:42:31] waiting for event...
[18:42:36] waiting for event...
[18:42:41] waiting for event...
^C

作者日志:

[18:42:6] made FIFOs, sleeping for 5s
[18:42:11] writing one value to each fifo (1.2, 0.5, -1.3)
[18:42:11] sleeping for 5s
[18:42:16] closing fifo2
[18:42:16] sleeping for 5s
[18:42:21] writing two remaining values (-1.0 and 3.2) to fifo1 & closing it
[18:42:21] sleeping for 5s
[18:42:26] server: writing remaining value (-2.5) to fifo0 & closing it
[18:42:26] done

您的 reader 需要 fopen 模式为 r 的 FIFO,因为 r+ 正在读取 写入。所以,当 writer 关闭所有 FIFO 时,仍然有 reader 具有写入权限,并且不会生成 EPOLLHUP 信号。