TCP Socket多路复用发送大数据
TCP Socket Multiplexing Send Large Data
TCP 套接字多路复用遇到一些问题。
//socket is non-blocking
const int MAX = 4096;
char *buff[MAX];
char *p = buff;
int fd, rvalue;
rvalue = 0;
if ( (fd = open(path, O_RDONLY)) < 0 ) {
return errno;
} else {
int didsend, didread;
int shouldsend;
while ((didread = read(fd, buff, MAX)) > 0) {
p = buff;
shouldsend = didread;
while ( 1 ) {
didsend = send(sockfd, p, shouldsend, 0);
//if send succeeds and returns the number of bytes fewer than asked for then try to send rest part in next time.
if (didsend < shouldsend) {
p += didsent;
shouldsend -= didsend;
continue;
}
//if there is no place for new data to send, then wait a brief time and try again.
if ( didsend < 0 && (errno == EWOULDBLOCK || errno == EAGAIN) ) {
usleep(1000);
continue;
}
//if all data has been sent then sending loop is over.
if (didsend == shouldsend) {
break;
}
//send error
if ( didsend < 0 ) {
rvalue = errno;
break;
}
}
}
close(fd);
if (didread == -1) {
return errno;
}
return rvalue;
}
假设我使用I/O多路复用函数poll()或kqueue(),以及非阻塞套接字,那么如果只有一些小数据,比如发送一条短信,它就可以正常工作。
但是如果涉及到大数据,我的意思是大于send()的缓冲区大小,因为使用非阻塞套接字,send()只会发送一部分数据,return如何它发送的数据很多,其余部分数据只能在另一个send()调用中发送,但需要时间,而且不知道需要多长时间。所以第二个 while() 实际上是一个使用非阻塞套接字的阻塞发送。
相当于:
//socket is blocking
const int MAX = 4096;
char *buff[MAX];
int fd, n;
if ( (fd = open(path, O_RDONLY)) < 0 ) {
return errno;
} else {
while ((n = read(fd, buff, MAX)) > 0) {
if (send(sockfd, buff, n, 0) < 0) {
return errno;
}
}
close(fd);
return 0;
}
那么,解决这个问题的方法是什么,多线程可能会起作用,但这可能会浪费资源。
这是使用多个连接和非阻塞套接字的单线程服务器的一般模式。
它主要是 C 中的伪代码,不进行必要的错误检查。但它给了你一个想法,即对于每个接受的连接,你保留一个结构实例来维护套接字句柄、请求解析状态、响应流和该连接的任何其他“状态”成员。然后你只需循环使用“select”等待或让多个线程做同样的事情。
同样,这只是伪代码,并以 select/poll 为例。您可以使用 epoll 获得更大的可扩展性。
while (1)
{
fd_set readset = {};
fd_set writeset = {};
for (int i = 0; i < number_of_client_connections; i++)
{
if (client_connections[i].reading_request)
FD_SET(client_connection.sock, &readset);
else
FD_SET(client_connection.sock, &writeset);
}
// add the listen socket to the read set
FD_SET(listen_socket, &readset);
select(n + 1, &readset, &writeset, &timeout); // wait for a socket to be ready (not shown - check for errors and return value)
if (FD_ISSET(listen_socket, &readset))
{
int new_client_socket = accept(listen_socket, &addr, &addrlength);
// create a struct that keeps track of the connection state data
struct ConnectionData client_connection = {};
client_connection.sock = new_client_socket;
client_connection.reading_request = 1; // awaiting for all the request bytes to come in
client_connections[number_of_client_connections++] = client_connection; // pseudo code, add the client_connection to the list
}
for (int i = 0; i < number_of_client_connections; i++)
{
if (client_connections[i].reading_request)
{
if (FD_ISSET(client_connections[i], &readset))
{
char buffer[2000];
int len = recv(client_connections[i].sock, buffer, 2000, 0);
// not shown - handle error case when (recv < 0)
// not shown - handle case when (recv == 0)
ProcessIncomingData(client_connections[i], buffer, len); // do all the request parsing here. Flip the client_connections[i].reading_request to 0 if ready to respond
}
}
else if (client_connections[i].reading_request == 0)
{
if (FD_ISSET(client_connections[i], &writeset))
{
client_connection* conn = &client_connections[i];
int len = send(conn->sock, conn->response_buffer + conn->txCount, conn->response_size - conn->txCount, 0);
conn->txCount += len;
if (conn->txCount == conn->response_size)
{
// done sending response - we can close this connection or change it to back to the reading state
}
}
}
}
TCP 套接字多路复用遇到一些问题。
//socket is non-blocking
const int MAX = 4096;
char *buff[MAX];
char *p = buff;
int fd, rvalue;
rvalue = 0;
if ( (fd = open(path, O_RDONLY)) < 0 ) {
return errno;
} else {
int didsend, didread;
int shouldsend;
while ((didread = read(fd, buff, MAX)) > 0) {
p = buff;
shouldsend = didread;
while ( 1 ) {
didsend = send(sockfd, p, shouldsend, 0);
//if send succeeds and returns the number of bytes fewer than asked for then try to send rest part in next time.
if (didsend < shouldsend) {
p += didsent;
shouldsend -= didsend;
continue;
}
//if there is no place for new data to send, then wait a brief time and try again.
if ( didsend < 0 && (errno == EWOULDBLOCK || errno == EAGAIN) ) {
usleep(1000);
continue;
}
//if all data has been sent then sending loop is over.
if (didsend == shouldsend) {
break;
}
//send error
if ( didsend < 0 ) {
rvalue = errno;
break;
}
}
}
close(fd);
if (didread == -1) {
return errno;
}
return rvalue;
}
假设我使用I/O多路复用函数poll()或kqueue(),以及非阻塞套接字,那么如果只有一些小数据,比如发送一条短信,它就可以正常工作。
但是如果涉及到大数据,我的意思是大于send()的缓冲区大小,因为使用非阻塞套接字,send()只会发送一部分数据,return如何它发送的数据很多,其余部分数据只能在另一个send()调用中发送,但需要时间,而且不知道需要多长时间。所以第二个 while() 实际上是一个使用非阻塞套接字的阻塞发送。
相当于:
//socket is blocking
const int MAX = 4096;
char *buff[MAX];
int fd, n;
if ( (fd = open(path, O_RDONLY)) < 0 ) {
return errno;
} else {
while ((n = read(fd, buff, MAX)) > 0) {
if (send(sockfd, buff, n, 0) < 0) {
return errno;
}
}
close(fd);
return 0;
}
那么,解决这个问题的方法是什么,多线程可能会起作用,但这可能会浪费资源。
这是使用多个连接和非阻塞套接字的单线程服务器的一般模式。
它主要是 C 中的伪代码,不进行必要的错误检查。但它给了你一个想法,即对于每个接受的连接,你保留一个结构实例来维护套接字句柄、请求解析状态、响应流和该连接的任何其他“状态”成员。然后你只需循环使用“select”等待或让多个线程做同样的事情。
同样,这只是伪代码,并以 select/poll 为例。您可以使用 epoll 获得更大的可扩展性。
while (1)
{
fd_set readset = {};
fd_set writeset = {};
for (int i = 0; i < number_of_client_connections; i++)
{
if (client_connections[i].reading_request)
FD_SET(client_connection.sock, &readset);
else
FD_SET(client_connection.sock, &writeset);
}
// add the listen socket to the read set
FD_SET(listen_socket, &readset);
select(n + 1, &readset, &writeset, &timeout); // wait for a socket to be ready (not shown - check for errors and return value)
if (FD_ISSET(listen_socket, &readset))
{
int new_client_socket = accept(listen_socket, &addr, &addrlength);
// create a struct that keeps track of the connection state data
struct ConnectionData client_connection = {};
client_connection.sock = new_client_socket;
client_connection.reading_request = 1; // awaiting for all the request bytes to come in
client_connections[number_of_client_connections++] = client_connection; // pseudo code, add the client_connection to the list
}
for (int i = 0; i < number_of_client_connections; i++)
{
if (client_connections[i].reading_request)
{
if (FD_ISSET(client_connections[i], &readset))
{
char buffer[2000];
int len = recv(client_connections[i].sock, buffer, 2000, 0);
// not shown - handle error case when (recv < 0)
// not shown - handle case when (recv == 0)
ProcessIncomingData(client_connections[i], buffer, len); // do all the request parsing here. Flip the client_connections[i].reading_request to 0 if ready to respond
}
}
else if (client_connections[i].reading_request == 0)
{
if (FD_ISSET(client_connections[i], &writeset))
{
client_connection* conn = &client_connections[i];
int len = send(conn->sock, conn->response_buffer + conn->txCount, conn->response_size - conn->txCount, 0);
conn->txCount += len;
if (conn->txCount == conn->response_size)
{
// done sending response - we can close this connection or change it to back to the reading state
}
}
}
}