使用异步 I/O 的消息排序 (epoll)

Message Ordering with Asynchronous I/O (epoll)

说我已经实现了一个基于 epoll 的 TCP 服务器,其中每个线程都是 运行 与下面非常相似的东西(取自 epoll 联机帮助页,其中 kdpfd 是 epoll 文件描述符,监听器是一个套接字正在侦听端口):

struct epoll_event ev, *events;
for(;;) {
    nfds = epoll_wait(kdpfd, events, maxevents, -1);
    for(n = 0; n < nfds; ++n) {
        if(events[n].data.fd == listener) {
            client = accept(listener, (struct sockaddr *) &local,
                            &addrlen);
            if(client < 0){
                perror("accept");
                continue;
            }
            setnonblocking(client);
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = client;
            if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
                fprintf(stderr, "epoll set insertion error: fd=%d0,
                        client);
                return -1;
            }
        }
        else
            do_use_fd(events[n].data.fd);
    }
}

对于上面的 do_use_fd(events[n].data.fd),假设我们想将收到的所有内容写入标准输出:

int do_use_fd(int fd) {
    int err;
    char buf[512];
    while ((err = read(fd, buf, 512)) > 0) {
        write(1, buf, err);
    }

    if (err == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
       // do some error handling and return -1

    return 0;
}

现在,假设我有 10k+ 连接,所有人都在很长一段时间内向我发送了很多消息。假设我的客户每隔几秒向我发送消息 hello, my name is {client's name}。假设(不知何故)此消息足够大,必须作为多个数据包传输。

因此,read(fd, buf, 512) 可能偶尔 return -1 带有一个 errno 表示它会阻塞。因此,我认为上述解决方案最终可能会得到如下输出:

hello, my nam
hello, my name is Pau
e is John Le
hello, my name is Geo
nnon
l McCartney
rge
hello, my name is Ringo
Starr
 Harrison

因为一旦读取在一个连接上阻塞,另一个读取就可以在不同的连接上开始。相反,我希望打印以下内容:

hello, my name is John Lennon
hello, my name is Paul McCartney
hello, my name is George Harrison
hello, my name is Ringo Starr

是否有推荐的处理此问题的方法?一种选择是为每个连接保留一个缓冲区,并检查消息是否已完成并且仅在发生这种情况时打印。但是对于 10k+ 连接,这是个好主意吗?一方面,有些事情告诉我这个解决方案不能很好地扩展。另一方面,如果消息只有 500 字节,有 10k 连接,这个解决方案只会占用 5MB。

提前致谢。

我认为在您的情况下,每个连接使用一个缓冲区是可以的。然而,为每个不完整的消息创建一个缓冲区可能更优雅。这意味着您必须以某种方式知道消息何时完成,因此您需要一个小协议,例如使用长度字段或终止符(以及可能在特定时间后终止不完整消息的超时)。这也将保证不会分配未使用的内存,因为缓冲区可以在消息完成并向上传递后立即释放。例如,您可以使用连接 5 元组作为键通过哈希图访问这些缓冲区。如果您决定使用消息绑定标识符,这当然会产生额外的开销,您甚至可以从用于一次传输多条消息的单个 tcp 连接中分离消息。

如果您需要强制对这些消息进行排序,您必须详细说明您的情况,因为在许多情况下排序是一个棘手的问题。

编辑:对不起,我现在有很多事情要做,所以我不能尽快回答。您是正确的,使用基于连接的方法更容易。使用的连接越稀疏,基于消息的优势就越大。如果您可以期望所有连接始终接收消息,那只是一种开销。如果连接有时空闲一段时间,它可能会大大减少内存使用量。另请注意,您的应用程序内存使用量不再与客户端数量成比例,而是与消息数量成比例,这通常很好,因为消息速率通常会有所不同。您对 TCP 流的排序也是正确的。只要您通过连接一次只发送一条完整的消息,TCP 就会确保顺序。一些应用程序,例如 HTTP2,重用相同的 TCP 连接来同时发送多条消息。在那种情况下,TCP 将无济于事,因为消息片段以未指定的顺序到达,您需要对它们进行多路分解(例如,通过 HTTP2 中的流 ID)。