在这个 mutex/pthread_cond_wait 结构中,我的数据会在哪里丢失?
Where would my data be getting lost at within this mutex/pthread_cond_wait structure?
最终编辑:我选择的答案说明了问题的解决方案。代表性示例代码显示在 diff here
编辑:post.
底部的完整可编译代码
我有这个基本的多线程服务器,它只接受一个连接,并且应该将文件描述符传递给一个线程,以允许该线程直接处理它,直到客户端断开连接。
出于某种原因,即使服务器内部有以下代码流,一些客户端也会“掉进裂缝”并陷入困境。 (他们永远不会被服务器处理,所以他们只是在接受连接后挂起)
以下块是我的服务器主 运行ning 循环:
while(g_serv.b_running)
{
//printf("Awaiting connection.\n");
client_fd = accept(g_serv.serv_listener_fd,
(struct sockaddr*)&cli_addr,
&clilen);
if (0 > client_fd)
{
fprintf(stderr,
"Error accepting connection. [%s]\n",
strerror(errno));
continue;
}
err = sem_trywait(&(g_serv.client_count_sem));
if (0 > err)
{
fprintf(stderr,
"Max connections reached. [%s]\n",
strerror(errno));
notify_client_max_connections(client_fd);
close(client_fd);
client_fd = 0;
continue;
}
printf("A client has connected.\n");
char byte[2] = "0";
err = send(client_fd, byte, 1, 0);
// Set up client FD in global position and wake up a thread to grab it
//
pthread_mutex_lock(&(g_serv.new_connection_fd_lock));
g_serv.new_connection_fd = client_fd;
if (0 != g_serv.new_connection_fd)
{
pthread_cond_signal(&(g_serv.new_connection));
}
pthread_mutex_unlock(&(g_serv.new_connection_fd_lock));
}
这个块是线程处理函数:
void* thread_handler(void* args)
{
serv_t* p_serv = (serv_t*)args;
bool thread_client_connected;
int thread_client_fd;
while(p_serv->b_running)
{
pthread_mutex_lock(&(p_serv->new_connection_fd_lock));
while (0 == p_serv->new_connection_fd && p_serv->b_running)
{
pthread_cond_wait(&(p_serv->new_connection),
&(p_serv->new_connection_fd_lock));
}
thread_client_fd = p_serv->new_connection_fd;
p_serv->new_connection_fd = 0;
pthread_mutex_unlock(&(p_serv->new_connection_fd_lock));
// In the case of a pthread cond broadcast for exiting the server.
//
if (0 == thread_client_fd)
{
continue;
}
thread_client_connected = true;
while (thread_client_connected)
{
thread_client_connected = handle_client(thread_client_fd);
}
close(thread_client_fd);
thread_client_fd = 0;
sem_post(&(p_serv->client_count_sem));
}
return NULL;
} /* thread_handler */
我的 serv_t 结构仅供参考:
typedef struct serv_t {
bool b_running;
int max_connections;
int serv_listener_fd;
sem_t client_count_sem;
pthread_mutex_t new_connection_fd_lock;
pthread_cond_t new_connection;
int new_connection_fd;
pthread_t* p_thread_ids;
} serv_t;
基本上,如果我 运行 netcat 或客户端程序通过 bash 命令将应用程序设置为“后台”,其中一些实例会卡住。我已经将输出重定向到一个文件,但发生的事情是 client/netcat 的特定实例在接受调用后卡住了。
更具体地说,如果我 运行 我的程序有两个线程,程序的一个实例会卡住并且没有后续副本会卡住,甚至 运行针对服务器的 6500 个实例也是如此。
如果我 运行 它有 10 个线程,多达 8 或 9 个实例卡住,但线程在服务器内仍然正常运行。
编辑:
我参考的客户端代码,从服务器开始让客户端知道服务器已准备好接收数据:
char buff[2] = { 0 };
err = recv(client_socket_fd, buff, 1, 0);
if ('0' != buff[0] && 1 != err)
{
fprintf(stderr,
"Server handshake error. [%s]\n",
strerror(errno));
close(client_socket_fd);
return EXIT_FAILURE;
}
if (NULL != p_infix_string)
{
if (MAX_BUFFER_SIZE < strlen(p_infix_string))
{
fprintf(stderr,
"Infix string is over 100 characters long.\n");
return EXIT_FAILURE;
}
errno = 0;
char* p_postfix = infix_to_postfix(p_infix_string);
if (EINVAL == errno || NULL == p_postfix)
{
fprintf(stderr, "Error converting provided string.\n");
}
bool success = send_postfix(p_postfix, client_socket_fd);
free(p_postfix);
if (false == success)
{
fprintf(stderr,
"An error occured while sending the equation to the server.\n");
close(client_socket_fd);
return EXIT_FAILURE;
}
}
客户端卡在此处的接收呼叫中:
bool send_postfix(char* p_postfix, int client_socket_fd)
{
if (NULL == p_postfix)
{
fprintf(stderr, "No postfix string provided to send to server.\n");
return false;
}
printf("Sending postfix to server\n");
int err = send(client_socket_fd,
p_postfix,
strnlen(p_postfix, MAX_BUFFER_SIZE),
0);
if(strnlen(p_postfix, MAX_BUFFER_SIZE) > err)
{
fprintf(stderr,
"Unable to send message to server. [%s]\n",
strerror(errno));
return false;
}
char response[MAX_BUFFER_SIZE] = { 0 };
printf("Waiting for receive\n");
err = recv(client_socket_fd, &response, MAX_BUFFER_SIZE, 0);
if (0 == err)
{
fprintf(stderr,
"Connection to server lost. [%s]\n",
strerror(errno));
return false;
}
else if (0 > err)
{
fprintf(stderr,
"Unable to receive message on socket. [%s]\n",
strerror(errno));
return false;
}
printf("Server responded with: \n%s\n", response);
return true;
} /* send_postfix */
编辑:https://github.com/TheStaplergun/Problem-Code
我将代码上传到这个 repo 并删除了我使用的无关文件的需要,并用占位符填充它们。
您可以使用带有命令 ./postfix_server -p 8888 -n 2
的服务器重现此问题,并使用 for i in {1..4}; do ./postfix_client -i 127.0.0.1 -p 8888 -e "3 + $i" &> $i.txt & done
在另一个终端中使用客户端问题
由于客户端顶部的setbuf,每个客户端的输出都会被强制刷新。 运行 吧,看看有没有程序挂了,如果没有运行 那个命令又来了。只需键入 PS 并查看其中一个是否挂起,然后查看生成的文本文件。您会看到它卡在接听电话处。
如果您登录服务器 (CTRL + C
),被卡住的客户端将关闭并返回来自服务器的 Connection reset by peer
响应,因此服务器仍然将该文件描述符锁定在某处。
我相信竞争条件正在以某种方式发生,因为它只是随机发生。
奇怪的是它只在每个服务器实例中发生一次。
如果我杀死那个挂起的实例并继续执行 10000 次它在服务器重置之前不会再挂起。
For some reason, even with the following code flow inside of the
server, some clients "Fall through the cracks" and get stuck in limbo.
(They never get handled by the server so they just hang after
accepting the connection)
可能还有其他问题,但我看到的第一个问题是主循环无法确保在尝试移交下一个连接之前,任何处理程序线程实际上都拾取了一个新连接。即使在接受新连接时已经在 CV 上阻塞了处理程序线程,主服务器线程也可能向 CV 发出信号,循环返回,接受另一个连接,重新获取互斥量,并覆盖 new-connection 在任何处理程序线程拾取前一个之前的 FD。如果您的线程数多于内核数,则发生这种情况的可能性会增加。
请注意,这也会干扰您的 semaphore-based 可用处理程序的计数——您为每个接受的信号量递减信号量,但仅对成功处理的信号量再次递增。
有多种方法可以使主服务器线程等待处理程序拾取新连接。一组将涉及服务器等待 CV 本身,并依靠处理程序在获取连接后向其发送信号。另一种可能更简单的方法涉及使用信号量来达到类似的效果。但我建议 不要 等待,而是为可用连接创建一个 thread-safe 队列,这样服务器就不必等待。如果这对您有用的话,这甚至允许排队比目前可用的处理程序更多的连接。
最终编辑:我选择的答案说明了问题的解决方案。代表性示例代码显示在 diff here
编辑:post.
底部的完整可编译代码我有这个基本的多线程服务器,它只接受一个连接,并且应该将文件描述符传递给一个线程,以允许该线程直接处理它,直到客户端断开连接。
出于某种原因,即使服务器内部有以下代码流,一些客户端也会“掉进裂缝”并陷入困境。 (他们永远不会被服务器处理,所以他们只是在接受连接后挂起)
以下块是我的服务器主 运行ning 循环:
while(g_serv.b_running)
{
//printf("Awaiting connection.\n");
client_fd = accept(g_serv.serv_listener_fd,
(struct sockaddr*)&cli_addr,
&clilen);
if (0 > client_fd)
{
fprintf(stderr,
"Error accepting connection. [%s]\n",
strerror(errno));
continue;
}
err = sem_trywait(&(g_serv.client_count_sem));
if (0 > err)
{
fprintf(stderr,
"Max connections reached. [%s]\n",
strerror(errno));
notify_client_max_connections(client_fd);
close(client_fd);
client_fd = 0;
continue;
}
printf("A client has connected.\n");
char byte[2] = "0";
err = send(client_fd, byte, 1, 0);
// Set up client FD in global position and wake up a thread to grab it
//
pthread_mutex_lock(&(g_serv.new_connection_fd_lock));
g_serv.new_connection_fd = client_fd;
if (0 != g_serv.new_connection_fd)
{
pthread_cond_signal(&(g_serv.new_connection));
}
pthread_mutex_unlock(&(g_serv.new_connection_fd_lock));
}
这个块是线程处理函数:
void* thread_handler(void* args)
{
serv_t* p_serv = (serv_t*)args;
bool thread_client_connected;
int thread_client_fd;
while(p_serv->b_running)
{
pthread_mutex_lock(&(p_serv->new_connection_fd_lock));
while (0 == p_serv->new_connection_fd && p_serv->b_running)
{
pthread_cond_wait(&(p_serv->new_connection),
&(p_serv->new_connection_fd_lock));
}
thread_client_fd = p_serv->new_connection_fd;
p_serv->new_connection_fd = 0;
pthread_mutex_unlock(&(p_serv->new_connection_fd_lock));
// In the case of a pthread cond broadcast for exiting the server.
//
if (0 == thread_client_fd)
{
continue;
}
thread_client_connected = true;
while (thread_client_connected)
{
thread_client_connected = handle_client(thread_client_fd);
}
close(thread_client_fd);
thread_client_fd = 0;
sem_post(&(p_serv->client_count_sem));
}
return NULL;
} /* thread_handler */
我的 serv_t 结构仅供参考:
typedef struct serv_t {
bool b_running;
int max_connections;
int serv_listener_fd;
sem_t client_count_sem;
pthread_mutex_t new_connection_fd_lock;
pthread_cond_t new_connection;
int new_connection_fd;
pthread_t* p_thread_ids;
} serv_t;
基本上,如果我 运行 netcat 或客户端程序通过 bash 命令将应用程序设置为“后台”,其中一些实例会卡住。我已经将输出重定向到一个文件,但发生的事情是 client/netcat 的特定实例在接受调用后卡住了。
更具体地说,如果我 运行 我的程序有两个线程,程序的一个实例会卡住并且没有后续副本会卡住,甚至 运行针对服务器的 6500 个实例也是如此。
如果我 运行 它有 10 个线程,多达 8 或 9 个实例卡住,但线程在服务器内仍然正常运行。
编辑:
我参考的客户端代码,从服务器开始让客户端知道服务器已准备好接收数据:
char buff[2] = { 0 };
err = recv(client_socket_fd, buff, 1, 0);
if ('0' != buff[0] && 1 != err)
{
fprintf(stderr,
"Server handshake error. [%s]\n",
strerror(errno));
close(client_socket_fd);
return EXIT_FAILURE;
}
if (NULL != p_infix_string)
{
if (MAX_BUFFER_SIZE < strlen(p_infix_string))
{
fprintf(stderr,
"Infix string is over 100 characters long.\n");
return EXIT_FAILURE;
}
errno = 0;
char* p_postfix = infix_to_postfix(p_infix_string);
if (EINVAL == errno || NULL == p_postfix)
{
fprintf(stderr, "Error converting provided string.\n");
}
bool success = send_postfix(p_postfix, client_socket_fd);
free(p_postfix);
if (false == success)
{
fprintf(stderr,
"An error occured while sending the equation to the server.\n");
close(client_socket_fd);
return EXIT_FAILURE;
}
}
客户端卡在此处的接收呼叫中:
bool send_postfix(char* p_postfix, int client_socket_fd)
{
if (NULL == p_postfix)
{
fprintf(stderr, "No postfix string provided to send to server.\n");
return false;
}
printf("Sending postfix to server\n");
int err = send(client_socket_fd,
p_postfix,
strnlen(p_postfix, MAX_BUFFER_SIZE),
0);
if(strnlen(p_postfix, MAX_BUFFER_SIZE) > err)
{
fprintf(stderr,
"Unable to send message to server. [%s]\n",
strerror(errno));
return false;
}
char response[MAX_BUFFER_SIZE] = { 0 };
printf("Waiting for receive\n");
err = recv(client_socket_fd, &response, MAX_BUFFER_SIZE, 0);
if (0 == err)
{
fprintf(stderr,
"Connection to server lost. [%s]\n",
strerror(errno));
return false;
}
else if (0 > err)
{
fprintf(stderr,
"Unable to receive message on socket. [%s]\n",
strerror(errno));
return false;
}
printf("Server responded with: \n%s\n", response);
return true;
} /* send_postfix */
编辑:https://github.com/TheStaplergun/Problem-Code
我将代码上传到这个 repo 并删除了我使用的无关文件的需要,并用占位符填充它们。
您可以使用带有命令 ./postfix_server -p 8888 -n 2
的服务器重现此问题,并使用 for i in {1..4}; do ./postfix_client -i 127.0.0.1 -p 8888 -e "3 + $i" &> $i.txt & done
由于客户端顶部的setbuf,每个客户端的输出都会被强制刷新。 运行 吧,看看有没有程序挂了,如果没有运行 那个命令又来了。只需键入 PS 并查看其中一个是否挂起,然后查看生成的文本文件。您会看到它卡在接听电话处。
如果您登录服务器 (CTRL + C
),被卡住的客户端将关闭并返回来自服务器的 Connection reset by peer
响应,因此服务器仍然将该文件描述符锁定在某处。
我相信竞争条件正在以某种方式发生,因为它只是随机发生。
奇怪的是它只在每个服务器实例中发生一次。
如果我杀死那个挂起的实例并继续执行 10000 次它在服务器重置之前不会再挂起。
For some reason, even with the following code flow inside of the server, some clients "Fall through the cracks" and get stuck in limbo. (They never get handled by the server so they just hang after accepting the connection)
可能还有其他问题,但我看到的第一个问题是主循环无法确保在尝试移交下一个连接之前,任何处理程序线程实际上都拾取了一个新连接。即使在接受新连接时已经在 CV 上阻塞了处理程序线程,主服务器线程也可能向 CV 发出信号,循环返回,接受另一个连接,重新获取互斥量,并覆盖 new-connection 在任何处理程序线程拾取前一个之前的 FD。如果您的线程数多于内核数,则发生这种情况的可能性会增加。
请注意,这也会干扰您的 semaphore-based 可用处理程序的计数——您为每个接受的信号量递减信号量,但仅对成功处理的信号量再次递增。
有多种方法可以使主服务器线程等待处理程序拾取新连接。一组将涉及服务器等待 CV 本身,并依靠处理程序在获取连接后向其发送信号。另一种可能更简单的方法涉及使用信号量来达到类似的效果。但我建议 不要 等待,而是为可用连接创建一个 thread-safe 队列,这样服务器就不必等待。如果这对您有用的话,这甚至允许排队比目前可用的处理程序更多的连接。