select() + 非阻塞 write() 是否可以在阻塞管道或套接字上使用?

Is select() + non-blocking write() possible on a blocking pipe or socket?

情况是我有一个阻塞管道或套接字fd,我想write()不阻塞,所以我先做一个select() ,但这仍然不能保证 write() 不会阻塞。

这是我收集的数据。即使 select() 表示 写入是可能的,写入超过 PIPE_BUF 字节可能会阻塞。 但是,最多写入 PIPE_BUF 字节似乎不会阻塞 实践,但这不是 POSIX spec.

的强制要求

那只指定了原子行为。 Python(!) documentation 指出:

Files reported as ready for writing by select(), poll() or similar interfaces in this module are guaranteed to not block on a write of up to PIPE_BUF bytes. This value is guaranteed by POSIX to be at least 512.

在下面的测试程序中,设置BUF_BYTES表示100000阻塞 write() 在 Linux、FreeBSD 或 Solaris 上成功 select。我 假设命名管道与匿名管道具有相似的行为。

不幸的是,阻塞套接字也会发生同样的情况。称呼 test_socket() in main() 并使用较大的 BUF_BYTES100000 很好 这里也)。目前还不清楚是否有像这样的安全缓冲区大小 PIPE_BUF 用于套接字。

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <limits.h>
#include <stdio.h>
#include <sys/select.h>
#include <unistd.h>

#define BUF_BYTES PIPE_BUF
char buf[BUF_BYTES];

int
probe_with_select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds)
{
    struct timeval timeout = {0, 0};
    int n_found = select(nfds, readfds, writefds, exceptfds, &timeout);
    if (n_found == -1) {
        perror("select");
    }
    return n_found;
}

void
check_if_readable(int fd)
{
    fd_set fdset;
    FD_ZERO(&fdset);
    FD_SET(fd, &fdset);
    printf("select() for read on fd %d returned %d\n",
           fd, probe_with_select(fd + 1, &fdset, 0, 0));
}

void
check_if_writable(int fd)
{
    fd_set fdset;
    FD_ZERO(&fdset);
    FD_SET(fd, &fdset);
    int n_found = probe_with_select(fd + 1, 0, &fdset, 0);
    printf("select() for write on fd %d returned %d\n", fd, n_found);
    /* if (n_found == 0) { */
    /*     printf("sleeping\n"); */
    /*     sleep(2); */
    /*     int n_found = probe_with_select(fd + 1, 0, &fdset, 0); */
    /*     printf("retried select() for write on fd %d returned %d\n",  */
    /*            fd, n_found); */
    /* } */
}

void
test_pipe(void)
{
    int pipe_fds[2];
    size_t written;
    int i;
    if (pipe(pipe_fds)) {
        perror("pipe failed");
        _exit(1);
    }
    printf("read side pipe fd: %d\n", pipe_fds[0]);
    printf("write side pipe fd: %d\n", pipe_fds[1]);
    for (i = 0; ; i++) {
        printf("i = %d\n", i);
        check_if_readable(pipe_fds[0]);
        check_if_writable(pipe_fds[1]);
        written = write(pipe_fds[1], buf, BUF_BYTES);
        if (written == -1) {
            perror("write");
            _exit(-1);
        }
        printf("written %d bytes\n", written);
    }
}

void
serve()
{
    int listenfd = 0, connfd = 0;
    struct sockaddr_in serv_addr;

    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    memset(&serv_addr, '0', sizeof(serv_addr));

    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(5000);

    bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

    listen(listenfd, 10);

    connfd = accept(listenfd, (struct sockaddr*)NULL, NULL);

    sleep(10);
}

int
connect_to_server()
{
    int sockfd = 0, n = 0;
    struct sockaddr_in serv_addr;

    if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket");
        exit(-1);
    }

    memset(&serv_addr, '0', sizeof(serv_addr));

    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(5000);

    if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr) <= 0) {
        perror("inet_pton");
        exit(-1);
    }

    if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
        perror("connect");
        exit(-1);
    }

    return sockfd;
}

void
test_socket(void)
{
    if (fork() == 0)  {
        serve();
    } else {
        int fd;
        int i;
        int written;
        sleep(1);
        fd = connect_to_server();

        for (i = 0; ; i++) {
            printf("i = %d\n", i);
            check_if_readable(fd);
            check_if_writable(fd);
            written = write(fd, buf, BUF_BYTES);
            if (written == -1) {
                perror("write");
                _exit(-1);
            }
            printf("written %d bytes\n", written);
        }
    }
}

int
main(void)
{
    test_pipe();
    /* test_socket(); */
}

您引用的 Posix 部分明确指出:

[for pipes] If the O_NONBLOCK flag is clear, a write request may cause the thread to block, but on normal completion it shall return nbyte.

[for streams, which presumably includes streaming sockets] If O_NONBLOCK is clear, and the STREAM cannot accept data (the STREAM write queue is full due to internal flow control conditions), write() shall block until data can be accepted.

因此,您引用的 Python 文档只能适用于非阻塞模式。但是,由于您没有使用 Python,它无论如何都没有相关性。

除非您希望在 select() 表示 fd 已准备好写入时一次发送一个字节,否则实际上没有办法知道您将能够发送多少,即便如此理论上可能(至少在文档中,如果不是在现实世界中) select 说它已准备好写入,然后条件在 select() 和 write() 之间的时间发生变化.

非阻塞发送是这里的解决方案,如果您从使用 write() 更改为 send(),则无需将文件描述符更改为非阻塞模式即可以非阻塞形式发送一条消息。您唯一需要更改的是将 MSG_DONTWAIT 标志添加到发送调用,这将使发送非阻塞而不改变套接字的属性。在这种情况下,您甚至根本不需要使用 select(),因为 send() 调用将为您提供 return 代码中所需的所有信息 - 如果您得到 return -1 的代码和 errno 是 EAGAIN 或 EWOULDBLOCK 然后你知道你不能再发送了。

ckolivas 的回答是正确的,但是在阅读这篇文章后 post,我想我可以为了兴趣添加一些测试数据。

我很快写了一个读取速度慢的 tcp 服务器(读取之间休眠 100 毫秒),每个周期读取 4KB。然后是一个快速编写的客户端,我用它来测试各种写入场景。两者都在读取(服务器)或写入(客户端)之前使用 select。

这是在 Linux Mint 18 运行 下 Windows 7 VM (VirtualBox) 分配的 1GB 内存。

对于阻塞情况

如果 "certain number of bytes" 的写入成为可能,则 select returned 并且写入要么立即全部完成,要么阻塞直到完成。在我的系统上,这个 "certain number of bytes" 至少是 1MB。在 OP 的系统上,这显然要少得多(少于 100,000)。

所以 select 而不是 return 直到可以写入至少 1MB。如果较小的写入随后会阻塞,则从来没有(我看到过)select 会 return 的情况。因此 select + write(x) where x was 4K or 8K or 128K never write blocked on this system.

这当然很好,但这是一个具有 1GB 内存的未加载 VM。预计其他系统会有所不同。但是,我 期望 在 select 之后发布的某个幻数(可能 PIPE_BUF )下方写入,永远不会阻塞所有 POSIX 合规系统。但是(再一次)我没有看到任何关于这种效果的文档,所以不能依赖这种行为(即使 Python 文档显然是这样)。正如 OP 所说,尚不清楚套接字是否有像 PIPE_BUF 这样的安全缓冲区大小。可惜了。

这就是 ckolivas 的 post 所说的,尽管我认为当只有一个字节可用时,没有理性的系统会 return 来自 select!

补充信息:

在任何时候(在正常操作中)都没有写入 return 除了请求的全部金额(或错误)之外的任何内容。

如果服务器被终止 (ctrl-c),客户端写入将立即 return 一个值(通常小于请求的值 - 没有正常操作!)没有其他错误指示​​。下一个 select 调用将立即 return,随后的写入将 return -1,错误号为 "Connection reset by peer"。这就是人们所期望的 - 这次尽可能多地写,下次失败。

这(和 EINTR)似乎是唯一一次写入 return 大于 0 但小于要求的数字。

如果服务器端正在读取而客户端被终止,则服务器会继续读取所有可用数据,直到 运行 结束。然后它读取一个零并关闭套接字。

对于非阻塞情况:

低于某个魔法值的行为与上面相同。 select returns,写入不会阻塞(当然)并且写入全部完成。

我的问题是否则会发生什么。 send(2) 手册页说在非阻塞模式下,发送失败并返回 EAGAIN 或 EWOULDBLOCK。这可能意味着(取决于你如何阅读)它是全有或全无。除了它还说 select 可用于确定何时可以发送更多数据。所以不能全有或全无。

Write(与不带标志的发送相同)表示它可以 return 少于请求。这种挑剔似乎很迂腐,但手册页是福音,所以我就这样阅读了它们。

在测试中,非阻塞写入的值大于某个特定值 returned 小于请求的值。这个值不是常量,它从写入到写入都在变化,但它总是非常大(> 1 到 2MB)。