TCP Socket多路复用发送大数据

TCP Socket Multiplexing Send Large Data

TCP 套接字多路复用遇到一些问题。

      //socket is non-blocking
      const int MAX = 4096;
      char *buff[MAX];
      char *p = buff;
      int fd, rvalue;
      rvalue = 0;

      if ( (fd = open(path, O_RDONLY)) < 0 ) {
          return errno;
      } else {
        int didsend, didread;
        int shouldsend;
        while ((didread = read(fd, buff, MAX)) > 0) {
          p = buff;
          shouldsend = didread;
          while ( 1 ) {
            didsend = send(sockfd, p, shouldsend, 0);
            //if send succeeds and returns the number of bytes fewer than asked for then try to send rest part in next time.
            if (didsend < shouldsend) {
              p += didsent;
              shouldsend -= didsend;
              continue;
            }
            //if there is no place for new data to send, then wait a brief time and try again.
            if ( didsend < 0 && (errno == EWOULDBLOCK || errno == EAGAIN) ) {
              usleep(1000);
              continue;
            }
            //if all data has been sent then sending loop is over.
            if (didsend == shouldsend) {
              break;
            }
            //send error
            if ( didsend < 0 ) {
              rvalue = errno;
              break;
            }
          }
        }
        close(fd);
        if (didread == -1) {
          return errno;
        }
        return rvalue;
      }

假设我使用I/O多路复用函数poll()或kqueue(),以及非阻塞套接字,那么如果只有一些小数据,比如发送一条短信,它就可以正常工作。

但是如果涉及到大数据,我的意思是大于send()的缓冲区大小,因为使用非阻塞套接字,send()只会发送一部分数据,return如何它发送的数据很多,其余部分数据只能在另一个send()调用中发送,但需要时间,而且不知道需要多长时间。所以第二个 while() 实际上是一个使用非阻塞套接字的阻塞发送。

相当于:

  //socket is blocking
  const int MAX = 4096;
  char *buff[MAX];
  int fd, n;
  if ( (fd = open(path, O_RDONLY)) < 0 ) {
      return errno;
  } else {
    while ((n = read(fd, buff, MAX)) > 0) {
      if (send(sockfd, buff, n, 0) < 0) {
        return errno;
      }
    }
    close(fd);
    return 0;
  }

那么,解决这个问题的方法是什么,多线程可能会起作用,但这可能会浪费资源。

这是使用多个连接和非阻塞套接字的单线程服务器的一般模式。

它主要是 C 中的伪代码,不进行必要的错误检查。但它给了你一个想法,即对于每个接受的连接,你保留一个结构实例来维护套接字句柄、请求解析状态、响应流和该连接的任何其他“状态”成员。然后你只需循环使用“select”等待或让多个线程做同样的事情。

同样,这只是伪代码,并以 select/poll 为例。您可以使用 epoll 获得更大的可扩展性。

while (1)
{
    fd_set readset = {};
    fd_set writeset = {};

    for (int i = 0; i < number_of_client_connections; i++)
    {
        if (client_connections[i].reading_request)
            FD_SET(client_connection.sock, &readset);
        else
            FD_SET(client_connection.sock, &writeset);
    }

    // add the listen socket to the read set
    FD_SET(listen_socket, &readset);

    select(n + 1, &readset, &writeset, &timeout); // wait for a socket to be ready (not shown - check for errors and return value)

    if (FD_ISSET(listen_socket, &readset))
    {
        int new_client_socket = accept(listen_socket, &addr, &addrlength);

        // create a struct that keeps track of the connection state data
        struct ConnectionData client_connection = {};
        client_connection.sock = new_client_socket;
        client_connection.reading_request = 1;  // awaiting for all the request bytes to come in
        client_connections[number_of_client_connections++] = client_connection;  // pseudo code, add the client_connection to the list
    }


    for (int i = 0; i < number_of_client_connections; i++)
    {
        if (client_connections[i].reading_request)
        {
            if (FD_ISSET(client_connections[i], &readset))
            {
                char buffer[2000];
                int len = recv(client_connections[i].sock, buffer, 2000, 0);
                // not shown - handle error case when (recv < 0)
                // not shown - handle case when (recv == 0)
                ProcessIncomingData(client_connections[i], buffer, len);  // do all the request parsing here.  Flip the client_connections[i].reading_request to 0 if ready to respond
            }
        }
        else if (client_connections[i].reading_request == 0)
        {
            if (FD_ISSET(client_connections[i], &writeset))
            {
                client_connection* conn = &client_connections[i];
                int len = send(conn->sock, conn->response_buffer + conn->txCount, conn->response_size - conn->txCount, 0);
                conn->txCount += len;

                if (conn->txCount == conn->response_size)
                {
                    // done sending response - we can close this connection or change it to back to the reading state
                }
            }
        }
    }