C 中简单线程池和 TCP 侦听器的问题

Problems with simple threadpool and TCP listener in C

我用简单的线程池逻辑在 c 中实现了一个多线程 TCP 侦听器,在多个终端中使用 for((i=1;i<100000;i++)) do echo "Hi $i" |nc 127.0.0.1 2000; 测试后我遇到了两个主要问题:

  1. 我会在超过 30000 个数据包后得到 'pthread: Cannot allocate memory',即使我使用 pthread_exit() 和 pthread_join(有 5gb 可用内存和 ps -hH 仅显示 4到 5 个线程)只有当我使用 pthread_detach(pthread_self()).
  2. 时它才会工作
  3. 当我使用 pthread_detach(pthread_self()) 时,我会在输出中得到一些额外的字符,在我对 [ 中的本地缓冲区变量使用 memset(&buffer,0,sizeof buffer) =31=] 功能一切正常,但我想我的代码中存在问题导致了这个问题。
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#define handle_err(msg)     \
    do                      \
    {                       \
        perror(msg);        \
        exit(EXIT_FAILURE); \
    } while (0)
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
#define MAX_ACCEPT_THREAD 200
uint empty_thread_sp = 1;
int empty_thread[MAX_ACCEPT_THREAD];
pthread_t *process_thread;
struct ip_args
{
    int port;
    int ipaddr;
};
struct client_thread
{
    unsigned int cfd;
    int thread_number;
};
int create_thread_pool()
{
    int i = 0;
    for (i = 0; i < MAX_ACCEPT_THREAD; i++)
    {
        empty_thread[i] = i;
    }
    process_thread =malloc(MAX_ACCEPT_THREAD * sizeof *process_thread);
}
int get_thread()
{

    if (empty_thread_sp < MAX_ACCEPT_THREAD)
    {
        pthread_mutex_lock(&lock);
        empty_thread_sp++;
        printf("ThreadNO_in_Get=%d\n", empty_thread[empty_thread_sp]);
        pthread_mutex_unlock(&lock);
        return empty_thread[empty_thread_sp];
    }
    else
    {
        return get_thread();
    }
}
int release_thread(struct client_thread *ct)
{
    if (empty_thread_sp > 0)
    {
        pthread_mutex_lock(&lock);
        empty_thread[--empty_thread_sp] = ct->thread_number;
        printf("ThreadNO_in_R=%d\n", empty_thread[empty_thread_sp]);
        pthread_mutex_unlock(&lock);
    }
    else
    {
        return 0;
    }
}
void *handle_client(void *arg)
{
    pthread_detach(pthread_self());
    int no;
    char buffer[1024];
    //TODO is this ok?
    memset(&buffer,0,sizeof buffer);
    struct client_thread *ct = arg;
    int n;
    n = read(ct->cfd, buffer, 1024);
    printf("%s", buffer);
    write(ct->cfd, &buffer, strlen(buffer));
    close(ct->cfd);
    release_thread(ct);
    no = ct->thread_number;
    free(ct);
    //TODO the following lines do not work
    //pthread_exit(NULL);
    //pthread_join(process_thread[no],NULL);
    
}
void *start_listen(void *args)
{
    struct ip_args *listen_addr = args;
    unsigned int sfd;
    struct client_thread *ct; 
    struct sockaddr_in my_addr;
    sfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sfd == -1)
        handle_err("socket");
    printf("%d\n", sfd);
    memset(&my_addr, 0, sizeof(my_addr));
    my_addr.sin_family = AF_INET;
    my_addr.sin_addr.s_addr = htonl(listen_addr->ipaddr);
    my_addr.sin_port = htons(listen_addr->port);
    struct sockaddr_in peer_addr;
    if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
        handle_err("bind");
    if (listen(sfd, 10) == -1)
        handle_err("listen");

    while (1)
    {
        ct= malloc(sizeof *ct); 
        socklen_t addr_size = sizeof peer_addr;
        ct->cfd = accept(sfd, (struct sockaddr *)&peer_addr, &addr_size);
        if (ct->cfd == 1)
            handle_err("accept");

        ct->thread_number = 0;
        ct->thread_number = get_thread();

        printf("Thread_number = %d\n", ct->thread_number);

        if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0)
            handle_err("pthread");
    }
}
int main()
{
    create_thread_pool();
    /* 1- socket 2-bind 3-listen 4-accept*/
    struct ip_args listen_addr1, listen_addr2, control_addr;
    listen_addr1.ipaddr = INADDR_ANY;
    listen_addr1.port = 2000;
    listen_addr2.ipaddr = INADDR_ANY;
    listen_addr2.port = 3000;
    control_addr.ipaddr = INADDR_LOOPBACK;
    control_addr.port = 57000;
    pthread_t listen_thread[3];
    pthread_create(&listen_thread[0], NULL, start_listen, (void *)&listen_addr1);
    pthread_create(&listen_thread[1], NULL, start_listen, (void *)&listen_addr2);
    //pthread_create(&listen_thread[2], NULL, start_listen, (void *)&control_addr);
    start_listen((void *)&control_addr);
    return 0;
}

已编辑: @terehpp 示例工作正常但它只创建 3 个进程线程,基于 @Martin James 和 @terehpp 建议我应该改变我的设计。 关于基于@terehpp 提示的第二个问题,我应该在本地非静态变量上使用 memset(&buffer,0,sizeof buffer)。

如果我们只讨论 pthread_join 问题而不涉及设计问题:

  1. 你应该调用 pthread_join 尽可能接近 pthread_create (如果你有可连接的线程)。 我删除 pthread_detach 并在 start_listen 函数中的 pthread_create 之后调用 pthread_join。 您还应该在分配资源的函数中释放资源。 所以我更换 free(ct);handle_clientstart_listen

像这样

     if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0) {                 
         handle_err("pthread");                                                                                      
     } else {                                                                                                        
         pthread_join(process_thread[ct->thread_number], NULL);
         // As long as we have threadpool we should reallocate resources for 
         // process_thread[ct->thread_number]
         // because after pthread_join it will be free automaticly
         //I just catch this error with coredump, so.....
         process_thread[ct->thread_number] = malloc(sizeof(pthread_t));
         // But I am not really sure this good solution, I think malloc at this place far far away from }good solution                                                                                    
     }                   }                                                                                            
     free(ct);

所以,现在它可以正常工作而不会崩溃 for((i=1;i<100000;i++)) do echo "Hi $i" |nc 127.0.0.1 2000;在多个终端

  1. 我无法重现您的第二期。 pthread_detach 的代码在我的机器上运行良好。

修复分段错误评论

我更改了 get_thread() 和 release_thread() 方法,还更改了 start_listener 和处理程序方法:

 int empty_thread_sp = MAX_ACCEPT_THREAD - 1;
 ...
 int get_thread()
 {
    while(1) {//My eyes bleeding, too....
        pthread_mutex_lock(&lock);
        if (empty_thread_sp >= 0) {
            int thread;
            printf("ThreadNO_in_Get=%d, index=%d\n",empty_thread[empty_thread_sp], empty_thread_sp);
            thread = empty_thread[empty_thread_sp];
            empty_thread_sp -= 1;

            pthread_mutex_unlock(&lock);
            return thread;
         }

         pthread_mutex_unlock(&lock);
    }
 }

 int release_thread(struct client_thread *ct)
 {
     pthread_mutex_lock(&lock);
     empty_thread_sp += 1;
     empty_thread[empty_thread_sp] = ct->thread_number;
     printf("ThreadNO_in_R=%d, index=%d\n", empty_thread[empty_thread_sp], empty_thread_sp);
     pthread_mutex_unlock(&lock);
     return 0;
 }

 void *handle_client(void *arg)
 {
     int no;
     char buffer[1024];
     memset(&buffer,0,sizeof buffer);
     struct client_thread *ct = arg;
     int n;
     n = read(ct->cfd, buffer, 1024);
     printf("%s", buffer);
     write(ct->cfd, &buffer, strlen(buffer));
     close(ct->cfd);
     return NULL;
  }
  
  void *start_listen(void *args)
  {
     struct ip_args *listen_addr = args;
     unsigned int sfd;
     struct client_thread *ct; 
     struct sockaddr_in my_addr;
     sfd = socket(AF_INET, SOCK_STREAM, 0);
     if (sfd == -1)
         handle_err("socket");
     printf("%d\n", sfd);
     memset(&my_addr, 0, sizeof(my_addr));
     my_addr.sin_family = AF_INET;
     my_addr.sin_addr.s_addr = htonl(listen_addr->ipaddr);
     my_addr.sin_port = htons(listen_addr->port);
     struct sockaddr_in peer_addr;
     if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
         handle_err("bind");
     if (listen(sfd, 10) == -1)
         handle_err("listen");

     while (1)
     {
         ct= malloc(sizeof *ct); 
         socklen_t addr_size = sizeof peer_addr;
         ct->cfd = accept(sfd, (struct sockaddr *)&peer_addr, &addr_size);
         if (ct->cfd == 1)
             handle_err("accept");

         ct->thread_number = get_thread();

         printf("Thread_number = %d\n", ct->thread_number);

         if (pthread_create(&process_thread[ct->thread_number], NULL, handle_client, (void *)ct) != 0){
             handle_err("pthread");
         } else {
             pthread_join(process_thread[ct->thread_number], NULL);
             release_thread(ct);
         }
         free(ct);
     }
  }