C 中简单线程池和 TCP 侦听器的问题
Problems with simple threadpool and TCP listener in C
我用简单的线程池逻辑在 c 中实现了一个多线程 TCP 侦听器,在多个终端中使用 for((i=1;i<100000;i++)) do echo "Hi $i" |nc 127.0.0.1 2000;
测试后我遇到了两个主要问题:
- 我会在超过 30000 个数据包后得到 'pthread: Cannot allocate memory',即使我使用 pthread_exit() 和 pthread_join(有 5gb 可用内存和 ps -hH 仅显示 4到 5 个线程)只有当我使用 pthread_detach(pthread_self()).
时它才会工作
- 当我使用 pthread_detach(pthread_self()) 时,我会在输出中得到一些额外的字符,在我对 [ 中的本地缓冲区变量使用 memset(&buffer,0,sizeof buffer) =31=] 功能一切正常,但我想我的代码中存在问题导致了这个问题。
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#define handle_err(msg) \
do \
{ \
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
#define MAX_ACCEPT_THREAD 200
uint empty_thread_sp = 1;
int empty_thread[MAX_ACCEPT_THREAD];
pthread_t *process_thread;
struct ip_args
{
int port;
int ipaddr;
};
struct client_thread
{
unsigned int cfd;
int thread_number;
};
int create_thread_pool()
{
int i = 0;
for (i = 0; i < MAX_ACCEPT_THREAD; i++)
{
empty_thread[i] = i;
}
process_thread =malloc(MAX_ACCEPT_THREAD * sizeof *process_thread);
}
int get_thread()
{
if (empty_thread_sp < MAX_ACCEPT_THREAD)
{
pthread_mutex_lock(&lock);
empty_thread_sp++;
printf("ThreadNO_in_Get=%d\n", empty_thread[empty_thread_sp]);
pthread_mutex_unlock(&lock);
return empty_thread[empty_thread_sp];
}
else
{
return get_thread();
}
}
int release_thread(struct client_thread *ct)
{
if (empty_thread_sp > 0)
{
pthread_mutex_lock(&lock);
empty_thread[--empty_thread_sp] = ct->thread_number;
printf("ThreadNO_in_R=%d\n", empty_thread[empty_thread_sp]);
pthread_mutex_unlock(&lock);
}
else
{
return 0;
}
}
void *handle_client(void *arg)
{
pthread_detach(pthread_self());
int no;
char buffer[1024];
//TODO is this ok?
memset(&buffer,0,sizeof buffer);
struct client_thread *ct = arg;
int n;
n = read(ct->cfd, buffer, 1024);
printf("%s", buffer);
write(ct->cfd, &buffer, strlen(buffer));
close(ct->cfd);
release_thread(ct);
no = ct->thread_number;
free(ct);
//TODO the following lines do not work
//pthread_exit(NULL);
//pthread_join(process_thread[no],NULL);
}
void *start_listen(void *args)
{
struct ip_args *listen_addr = args;
unsigned int sfd;
struct client_thread *ct;
struct sockaddr_in my_addr;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
handle_err("socket");
printf("%d\n", sfd);
memset(&my_addr, 0, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_addr.s_addr = htonl(listen_addr->ipaddr);
my_addr.sin_port = htons(listen_addr->port);
struct sockaddr_in peer_addr;
if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
handle_err("bind");
if (listen(sfd, 10) == -1)
handle_err("listen");
while (1)
{
ct= malloc(sizeof *ct);
socklen_t addr_size = sizeof peer_addr;
ct->cfd = accept(sfd, (struct sockaddr *)&peer_addr, &addr_size);
if (ct->cfd == 1)
handle_err("accept");
ct->thread_number = 0;
ct->thread_number = get_thread();
printf("Thread_number = %d\n", ct->thread_number);
if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0)
handle_err("pthread");
}
}
int main()
{
create_thread_pool();
/* 1- socket 2-bind 3-listen 4-accept*/
struct ip_args listen_addr1, listen_addr2, control_addr;
listen_addr1.ipaddr = INADDR_ANY;
listen_addr1.port = 2000;
listen_addr2.ipaddr = INADDR_ANY;
listen_addr2.port = 3000;
control_addr.ipaddr = INADDR_LOOPBACK;
control_addr.port = 57000;
pthread_t listen_thread[3];
pthread_create(&listen_thread[0], NULL, start_listen, (void *)&listen_addr1);
pthread_create(&listen_thread[1], NULL, start_listen, (void *)&listen_addr2);
//pthread_create(&listen_thread[2], NULL, start_listen, (void *)&control_addr);
start_listen((void *)&control_addr);
return 0;
}
已编辑:
@terehpp 示例工作正常但它只创建 3 个进程线程,基于 @Martin James 和 @terehpp 建议我应该改变我的设计。
关于基于@terehpp 提示的第二个问题,我应该在本地非静态变量上使用 memset(&buffer,0,sizeof buffer)。
如果我们只讨论 pthread_join 问题而不涉及设计问题:
- 你应该调用 pthread_join 尽可能接近 pthread_create (如果你有可连接的线程)。
我删除 pthread_detach 并在 start_listen 函数中的 pthread_create 之后调用 pthread_join。
您还应该在分配资源的函数中释放资源。
所以我更换
free(ct);
从 handle_client
到 start_listen
像这样
if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0) {
handle_err("pthread");
} else {
pthread_join(process_thread[ct->thread_number], NULL);
// As long as we have threadpool we should reallocate resources for
// process_thread[ct->thread_number]
// because after pthread_join it will be free automaticly
//I just catch this error with coredump, so.....
process_thread[ct->thread_number] = malloc(sizeof(pthread_t));
// But I am not really sure this good solution, I think malloc at this place far far away from }good solution
} }
free(ct);
所以,现在它可以正常工作而不会崩溃
for((i=1;i<100000;i++)) do echo "Hi $i" |nc 127.0.0.1 2000;在多个终端
- 我无法重现您的第二期。 pthread_detach 的代码在我的机器上运行良好。
修复分段错误评论
我更改了 get_thread() 和 release_thread() 方法,还更改了 start_listener 和处理程序方法:
int empty_thread_sp = MAX_ACCEPT_THREAD - 1;
...
int get_thread()
{
while(1) {//My eyes bleeding, too....
pthread_mutex_lock(&lock);
if (empty_thread_sp >= 0) {
int thread;
printf("ThreadNO_in_Get=%d, index=%d\n",empty_thread[empty_thread_sp], empty_thread_sp);
thread = empty_thread[empty_thread_sp];
empty_thread_sp -= 1;
pthread_mutex_unlock(&lock);
return thread;
}
pthread_mutex_unlock(&lock);
}
}
int release_thread(struct client_thread *ct)
{
pthread_mutex_lock(&lock);
empty_thread_sp += 1;
empty_thread[empty_thread_sp] = ct->thread_number;
printf("ThreadNO_in_R=%d, index=%d\n", empty_thread[empty_thread_sp], empty_thread_sp);
pthread_mutex_unlock(&lock);
return 0;
}
void *handle_client(void *arg)
{
int no;
char buffer[1024];
memset(&buffer,0,sizeof buffer);
struct client_thread *ct = arg;
int n;
n = read(ct->cfd, buffer, 1024);
printf("%s", buffer);
write(ct->cfd, &buffer, strlen(buffer));
close(ct->cfd);
return NULL;
}
void *start_listen(void *args)
{
struct ip_args *listen_addr = args;
unsigned int sfd;
struct client_thread *ct;
struct sockaddr_in my_addr;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
handle_err("socket");
printf("%d\n", sfd);
memset(&my_addr, 0, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_addr.s_addr = htonl(listen_addr->ipaddr);
my_addr.sin_port = htons(listen_addr->port);
struct sockaddr_in peer_addr;
if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
handle_err("bind");
if (listen(sfd, 10) == -1)
handle_err("listen");
while (1)
{
ct= malloc(sizeof *ct);
socklen_t addr_size = sizeof peer_addr;
ct->cfd = accept(sfd, (struct sockaddr *)&peer_addr, &addr_size);
if (ct->cfd == 1)
handle_err("accept");
ct->thread_number = get_thread();
printf("Thread_number = %d\n", ct->thread_number);
if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0){
handle_err("pthread");
} else {
pthread_join(process_thread[ct->thread_number], NULL);
release_thread(ct);
}
free(ct);
}
}
我用简单的线程池逻辑在 c 中实现了一个多线程 TCP 侦听器,在多个终端中使用 for((i=1;i<100000;i++)) do echo "Hi $i" |nc 127.0.0.1 2000;
测试后我遇到了两个主要问题:
- 我会在超过 30000 个数据包后得到 'pthread: Cannot allocate memory',即使我使用 pthread_exit() 和 pthread_join(有 5gb 可用内存和 ps -hH 仅显示 4到 5 个线程)只有当我使用 pthread_detach(pthread_self()). 时它才会工作
- 当我使用 pthread_detach(pthread_self()) 时,我会在输出中得到一些额外的字符,在我对 [ 中的本地缓冲区变量使用 memset(&buffer,0,sizeof buffer) =31=] 功能一切正常,但我想我的代码中存在问题导致了这个问题。
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#define handle_err(msg) \
do \
{ \
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
#define MAX_ACCEPT_THREAD 200
uint empty_thread_sp = 1;
int empty_thread[MAX_ACCEPT_THREAD];
pthread_t *process_thread;
struct ip_args
{
int port;
int ipaddr;
};
struct client_thread
{
unsigned int cfd;
int thread_number;
};
int create_thread_pool()
{
int i = 0;
for (i = 0; i < MAX_ACCEPT_THREAD; i++)
{
empty_thread[i] = i;
}
process_thread =malloc(MAX_ACCEPT_THREAD * sizeof *process_thread);
}
int get_thread()
{
if (empty_thread_sp < MAX_ACCEPT_THREAD)
{
pthread_mutex_lock(&lock);
empty_thread_sp++;
printf("ThreadNO_in_Get=%d\n", empty_thread[empty_thread_sp]);
pthread_mutex_unlock(&lock);
return empty_thread[empty_thread_sp];
}
else
{
return get_thread();
}
}
int release_thread(struct client_thread *ct)
{
if (empty_thread_sp > 0)
{
pthread_mutex_lock(&lock);
empty_thread[--empty_thread_sp] = ct->thread_number;
printf("ThreadNO_in_R=%d\n", empty_thread[empty_thread_sp]);
pthread_mutex_unlock(&lock);
}
else
{
return 0;
}
}
void *handle_client(void *arg)
{
pthread_detach(pthread_self());
int no;
char buffer[1024];
//TODO is this ok?
memset(&buffer,0,sizeof buffer);
struct client_thread *ct = arg;
int n;
n = read(ct->cfd, buffer, 1024);
printf("%s", buffer);
write(ct->cfd, &buffer, strlen(buffer));
close(ct->cfd);
release_thread(ct);
no = ct->thread_number;
free(ct);
//TODO the following lines do not work
//pthread_exit(NULL);
//pthread_join(process_thread[no],NULL);
}
void *start_listen(void *args)
{
struct ip_args *listen_addr = args;
unsigned int sfd;
struct client_thread *ct;
struct sockaddr_in my_addr;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
handle_err("socket");
printf("%d\n", sfd);
memset(&my_addr, 0, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_addr.s_addr = htonl(listen_addr->ipaddr);
my_addr.sin_port = htons(listen_addr->port);
struct sockaddr_in peer_addr;
if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
handle_err("bind");
if (listen(sfd, 10) == -1)
handle_err("listen");
while (1)
{
ct= malloc(sizeof *ct);
socklen_t addr_size = sizeof peer_addr;
ct->cfd = accept(sfd, (struct sockaddr *)&peer_addr, &addr_size);
if (ct->cfd == 1)
handle_err("accept");
ct->thread_number = 0;
ct->thread_number = get_thread();
printf("Thread_number = %d\n", ct->thread_number);
if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0)
handle_err("pthread");
}
}
int main()
{
create_thread_pool();
/* 1- socket 2-bind 3-listen 4-accept*/
struct ip_args listen_addr1, listen_addr2, control_addr;
listen_addr1.ipaddr = INADDR_ANY;
listen_addr1.port = 2000;
listen_addr2.ipaddr = INADDR_ANY;
listen_addr2.port = 3000;
control_addr.ipaddr = INADDR_LOOPBACK;
control_addr.port = 57000;
pthread_t listen_thread[3];
pthread_create(&listen_thread[0], NULL, start_listen, (void *)&listen_addr1);
pthread_create(&listen_thread[1], NULL, start_listen, (void *)&listen_addr2);
//pthread_create(&listen_thread[2], NULL, start_listen, (void *)&control_addr);
start_listen((void *)&control_addr);
return 0;
}
已编辑: @terehpp 示例工作正常但它只创建 3 个进程线程,基于 @Martin James 和 @terehpp 建议我应该改变我的设计。 关于基于@terehpp 提示的第二个问题,我应该在本地非静态变量上使用 memset(&buffer,0,sizeof buffer)。
如果我们只讨论 pthread_join 问题而不涉及设计问题:
- 你应该调用 pthread_join 尽可能接近 pthread_create (如果你有可连接的线程)。
我删除 pthread_detach 并在 start_listen 函数中的 pthread_create 之后调用 pthread_join。
您还应该在分配资源的函数中释放资源。
所以我更换
free(ct);
从handle_client
到start_listen
像这样
if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0) {
handle_err("pthread");
} else {
pthread_join(process_thread[ct->thread_number], NULL);
// As long as we have threadpool we should reallocate resources for
// process_thread[ct->thread_number]
// because after pthread_join it will be free automaticly
//I just catch this error with coredump, so.....
process_thread[ct->thread_number] = malloc(sizeof(pthread_t));
// But I am not really sure this good solution, I think malloc at this place far far away from }good solution
} }
free(ct);
所以,现在它可以正常工作而不会崩溃 for((i=1;i<100000;i++)) do echo "Hi $i" |nc 127.0.0.1 2000;在多个终端
- 我无法重现您的第二期。 pthread_detach 的代码在我的机器上运行良好。
修复分段错误评论
我更改了 get_thread() 和 release_thread() 方法,还更改了 start_listener 和处理程序方法:
int empty_thread_sp = MAX_ACCEPT_THREAD - 1;
...
int get_thread()
{
while(1) {//My eyes bleeding, too....
pthread_mutex_lock(&lock);
if (empty_thread_sp >= 0) {
int thread;
printf("ThreadNO_in_Get=%d, index=%d\n",empty_thread[empty_thread_sp], empty_thread_sp);
thread = empty_thread[empty_thread_sp];
empty_thread_sp -= 1;
pthread_mutex_unlock(&lock);
return thread;
}
pthread_mutex_unlock(&lock);
}
}
int release_thread(struct client_thread *ct)
{
pthread_mutex_lock(&lock);
empty_thread_sp += 1;
empty_thread[empty_thread_sp] = ct->thread_number;
printf("ThreadNO_in_R=%d, index=%d\n", empty_thread[empty_thread_sp], empty_thread_sp);
pthread_mutex_unlock(&lock);
return 0;
}
void *handle_client(void *arg)
{
int no;
char buffer[1024];
memset(&buffer,0,sizeof buffer);
struct client_thread *ct = arg;
int n;
n = read(ct->cfd, buffer, 1024);
printf("%s", buffer);
write(ct->cfd, &buffer, strlen(buffer));
close(ct->cfd);
return NULL;
}
void *start_listen(void *args)
{
struct ip_args *listen_addr = args;
unsigned int sfd;
struct client_thread *ct;
struct sockaddr_in my_addr;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
handle_err("socket");
printf("%d\n", sfd);
memset(&my_addr, 0, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_addr.s_addr = htonl(listen_addr->ipaddr);
my_addr.sin_port = htons(listen_addr->port);
struct sockaddr_in peer_addr;
if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
handle_err("bind");
if (listen(sfd, 10) == -1)
handle_err("listen");
while (1)
{
ct= malloc(sizeof *ct);
socklen_t addr_size = sizeof peer_addr;
ct->cfd = accept(sfd, (struct sockaddr *)&peer_addr, &addr_size);
if (ct->cfd == 1)
handle_err("accept");
ct->thread_number = get_thread();
printf("Thread_number = %d\n", ct->thread_number);
if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0){
handle_err("pthread");
} else {
pthread_join(process_thread[ct->thread_number], NULL);
release_thread(ct);
}
free(ct);
}
}