如何在 C 中多次 "reuse" 一个线程?

how to "reuse" a thread multiple times in C?

我需要创建一个经理-工作服务器,其中工作人员一次只处理一个请求。在我想到的代码中,管理器将文件描述符存储在一个队列中;线程检索文件描述符并处理它的请求。

我的问题是,在当前代码中,一开始创建了N个线程,等待处理N个请求;但是一旦处理了 N 个请求,clientFun() 函数将不再运行,因为初始线程已完成其工作。

服务器代码:

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include "utils.h"
#include "conn.h"

#define DIM_BUFFER 100
#define N_THREADS 1

struct nodo
{
    int fd;
    struct nodo *prossimoPtr;
};
typedef struct nodo Nodo;
typedef Nodo *NodoPtr;

static Nodo *testaPtr = NULL;
static Nodo *codaPtr = NULL;

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t emptyFd = PTHREAD_COND_INITIALIZER;

unsigned int updateMaxSelect(int maxFd, fd_set set);
static void run_server(int pipeW2M_Read);
void push(NodoPtr *testaPtrF, NodoPtr *codaPtrF, int fdF);
static void *clientFun(void *pipeW2M_WriteF);
int pop(NodoPtr *lPtrF);
void gestioneCoda(int maxFdF, int fd, fd_set set);
void stampa(NodoPtr lPtrF);
void cleanup();

int main()
{
    cleanup();
    atexit(cleanup);

    int pipeW2M[2];
    SYSCALL(pipe(pipeW2M), "Errore: pipe(pipeW2M)")

    pthread_t threadFd[N_THREADS];
    for(int i = 0; i < N_THREADS; i++)
    {
        THREAD_CREATE(&threadFd[i], NULL, &clientFun, (void *) &pipeW2M[WRITE_END], "Thread setId")
    }

    run_server(pipeW2M[READ_END]);
    SYSCALL(close(pipeW2M[WRITE_END]), "Errore: close(pipeW2M[WRITE_END])")
    SYSCALL(close(pipeW2M[READ_END]), "Errore: close(pipeW2M[READ_END])")


    for(int i = 0; i < N_THREADS; i++)
    {
        THREAD_JOIN(threadFd[i], NULL, "Impossibile fare la join: seetId");
    }
}

static void run_server(int pipeW2M_Read)
{
    //Socket di connessione
    int fdSkt;
    RETURN_SYSCALL(fdSkt, socket(AF_UNIX, SOCK_STREAM, 0), "Errore creazione socket - fdSkt")
    struct sockaddr_un sckAddr;
    strncpy(sckAddr.sun_path, SOCKNAME, MAXBACKLOG);
    sckAddr.sun_family = AF_UNIX;
    SYSCALL(bind(fdSkt, (struct sockaddr *) &sckAddr, sizeof(sckAddr)), "Errore bind - fdSkt")
    SYSCALL(listen(fdSkt, SOMAXCONN), "Errore listen - fdSkt")

    //Massimo fd attivo
    int maxFd = fdSkt;

    //Inizializzazione set
    fd_set set, readSet;
    FD_ZERO(&set);
    FD_SET(fdSkt, &set); //FD_SET imposta a 1 il bit corrispondente a fdSkt
    FD_SET(pipeW2M_Read, &set);
    if(pipeW2M_Read > maxFd)
    {
        maxFd = pipeW2M_Read;
    }

    int fdSkt_accept;
    while(1)
    {
        readSet = set;
        SYSCALL(select(maxFd + 1, &readSet, NULL, NULL, NULL), "select(fd_num + 1, &rdset, NULL, NULL, NULL)")

        for(int i = 0; i <= maxFd; i++)
        {
            if (FD_ISSET(i, &readSet))
            {
                if (i == fdSkt)
                {
                    RETURN_SYSCALL(fdSkt_accept, accept(fdSkt, NULL, 0), "fdSkt_accept = accept(fdSkt, NULL, 0)")
                    FD_SET(fdSkt_accept, &set);
                    if (fdSkt_accept > maxFd) {
                        maxFd = fdSkt_accept;
                    }
                    continue;
                }

                if(i == pipeW2M_Read)
                {
                    int pipeFdSoccket;
                    SYSCALL(read(pipeW2M_Read, &pipeFdSoccket, sizeof(int)), "Errore")
                    printf("%d\n", pipeFdSoccket);

                    FD_SET(pipeFdSoccket, &set);
                    if(pipeFdSoccket > maxFd)
                        maxFd = pipeFdSoccket;

                    continue;
                }

                gestioneCoda(maxFd, i, set);
            }
        }
    }
    SYSCALL(close(fdSkt), "Errore close - fdSkt")
}

static void *clientFun(void *pipeW2M_WriteF)
{
    puts("Entro");
    int pipeW2M_Write = *((int *) pipeW2M_WriteF);

    LOCK(&mutex)
    while (testaPtr == NULL)
    {
        WAIT(&emptyFd, &mutex)
    }
    int fdAccept = pop(&testaPtr);
    printf("Fd in thread: %d\n", fdAccept);
    UNLOCK(&mutex)

    char buffer[DIM_BUFFER];
    memset(buffer, '[=11=]', DIM_BUFFER);

    int lenghtRead;
    RETURN_SYSCALL(lenghtRead, read(fdAccept, buffer, DIM_BUFFER), "Errore: read(fdSkt_com, buffer, DIM_BUFFER)")

    if(lenghtRead == 0)
    {
        SYSCALL(write(pipeW2M_Write, &fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, &fdAccept, sizeof(int))")
        return NULL;
    }

    //lenghtRead comprende conta tutti i caretteri letti (compreso il '[=11=]' se è presente)
    for(int i = 0; i < lenghtRead-1; i++)
    {
        buffer[i] = toupper((unsigned char) buffer[i]);
    }
    SYSCALL(writen(fdAccept, buffer, lenghtRead), "Errore: writen(fdSkt_com, buffer, lengthBuffer)")

    SYSCALL(write(pipeW2M_Write, &fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, &fdAccept, sizeof(int))")
    puts("Esco");
    return NULL;

}

unsigned int updateMaxSelect(int maxFd, fd_set set)
{
    for(int i = maxFd - 1; i >= 0; i--)
    {
        if(FD_ISSET(i, &set))
        {
            return i;
        }
    }
    return -1;
}

void gestioneCoda(int maxFdF, int fd, fd_set set)
{
    LOCK(&mutex)

    push(&testaPtr, &codaPtr, fd);
    FD_CLR(fd, &set);
    if(fd == maxFdF)
        maxFdF = updateMaxSelect(fd, set);

    SIGNAL(&emptyFd)
    UNLOCK(&mutex)
}

void push(NodoPtr *testaPtrF, NodoPtr *codaPtrF, int fdF)
{
    NodoPtr nuovoPtr = NULL;
    RETURN_NULL_SYSCALL(nuovoPtr, malloc(sizeof(Nodo)), "nuovoPtr = malloc(sizeof(Nodo))")

    nuovoPtr->fd = fdF;
    nuovoPtr->prossimoPtr = NULL;

    if(*testaPtrF == NULL)
    {
        *testaPtrF = nuovoPtr;
        *codaPtrF = nuovoPtr;
    }
    else
    {
        (*codaPtrF)->prossimoPtr = nuovoPtr;
        *codaPtrF = nuovoPtr;
    }
}

int pop(NodoPtr *lPtrF)
{
    if(*lPtrF != NULL)
    {
        int value = (*lPtrF)->fd;
        NodoPtr tempPtr = *lPtrF;
        *lPtrF = (*lPtrF)->prossimoPtr;
        free(tempPtr);

        return value;
    }
    else
    {
        puts("la lista è vuota");
        exit(EXIT_FAILURE);
    }
}

void stampa(NodoPtr lPtrF)
{
    if(lPtrF != NULL)
    {
        printf("Parola: %d\n", lPtrF->fd);

        stampa(lPtrF->prossimoPtr);
    }
    else
        puts("NULL");
}

void cleanup()
{
    unlink(SOCKNAME);
}

客户代码:

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include "utils.h"
#include "conn.h"
//Librerie per i socket:
#include <sys/socket.h>
#include <sys/un.h>

#define DIM_BUFFER 256


int main()
{
    int fdSkt;
    //Creazione socket - si usano (quasi) sempre questi parametri
    RETURN_SYSCALL(fdSkt, socket(AF_UNIX, SOCK_STREAM, 0), "Errore creazione socket - fdSkt")


    //Connect
    //sckAddr deve essere uguale a quello del server
    struct sockaddr_un sckAddr;
    strncpy(sckAddr.sun_path, SOCKNAME, 108);
    sckAddr.sun_family = AF_UNIX;

    //il socket potrebbe non aver ancora fatto la listen (per via dello scheduler)
    //il prof nelle correzioni mette solamente: SYSCALL(connect(fdSkt, (struct sockaddr *) &sckAddr, sizeof(sckAddr)), "")
    while(connect(fdSkt, (struct sockaddr *) &sckAddr, sizeof(sckAddr)) == -1)
    {
        puts("Bloccato");
        if(errno != ENOENT)
        {
            perror("Errore connect - fdSkt");
            exit(EXIT_FAILURE);
        }
    }

    while (1)
    {
        char buffer[DIM_BUFFER];
        memset(buffer, '[=12=]', DIM_BUFFER);
        SCANF_STRINGA(buffer);

        if(strncmp(buffer, "quit", strlen("quit")) == 0) {
            break;
        }

        int lenghtBuffer = strlen(buffer)+1;
        SYSCALL(writen(fdSkt, buffer, lenghtBuffer), "Errore: writen(fdSkt, buffer, strlen(buffer)+1)")

        SYSCALL(readn(fdSkt, buffer, lenghtBuffer), "Errore: writen(fdSkt, buffer, strlen(buffer)+1)")
        printf("%s\n", buffer);

    }

    SYSCALL(close(fdSkt), "Errore close - fdSkt")

    return 0;
}

Utils.h代码:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>

#define READ_END 0
#define WRITE_END 1

#define RETURN_SYSCALL(r,c,e) if((r=c)==-1) { perror(e);exit(errno); }
#define SYSCALL(c,e) if(c==-1) { perror(e);exit(errno);}
#define THREAD_CREATE(a, b, c, d, text) if(pthread_create(a, b, c, d) != 0) { perror(text);exit(EXIT_FAILURE);}
#define THREAD_JOIN(a, b, text) if(pthread_join(a, b) != 0) { perror(text);exit(EXIT_FAILURE);}
//usare con le funzioni che ritornano NULL quando falliscono e di cui si vuole memorizzare il valore di ritorno (es: fopen)
#define RETURN_NULL_SYSCALL(retrunVar, fun, text) if((retrunVar=fun) == NULL) { perror(text);exit(errno); }
//usare per le syscall che quando falliscono ritornano un valore != 0
#define SYSCALL_ZERO(syscall, text) if(syscall != 0) {perror(text);exit(errno);}

#define LOCK(l)                                         \
if (pthread_mutex_lock(l) != 0)                         \
{                                                       \
    fprintf(stderr, "ERRORE FATALE lock\n");            \
    pthread_exit((void*)EXIT_FAILURE);                  \
}

#define UNLOCK(l)                                       \
if (pthread_mutex_unlock(l) != 0)                       \
{                                                       \
    fprintf(stderr, "ERRORE FATALE unlock\n");          \
    pthread_exit((void*)EXIT_FAILURE);                  \
}

#define SIGNAL(c)                                       \
if (pthread_cond_signal(c) != 0)                        \
{                                                       \
    fprintf(stderr, "ERRORE FATALE signal\n");          \
    pthread_exit((void*)EXIT_FAILURE);                  \
}

#define WAIT(c, l)                                      \
if (pthread_cond_wait(c,l) != 0)                        \
{                                                       \
    fprintf(stderr, "ERRORE FATALE wait\n");            \
    pthread_exit((void*)EXIT_FAILURE);                  \
}

#define SCANF_STRINGA(stringa)                \
if(scanf("%s", stringa) == 0)                 \
{                                             \
    perror("Impossibile leggere la stringa"); \
    exit(EXIT_FAILURE);                       \
}

Conn.h代码:

#if !defined(CONN_H)
#define CONN_H

#include <sys/types.h> 
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>

#define SOCKNAME "./cs_sock"
#define MAXBACKLOG 108

/** Evita letture parziali
 *
 *   \retval -1   errore (errno settato)
 *   \retval  0   se durante la lettura da fd leggo EOF
 *   \retval size se termina con successo
 */
static inline int readn(long fd, void *buf, size_t size) {
    size_t left = size;
    int r;
    char *bufptr = (char*)buf;
    while(left>0) {
    if ((r=read((int)fd ,bufptr,left)) == -1) {
        if (errno == EINTR) continue;
        return -1;
    }
    if (r == 0) return 0;   // EOF
        left    -= r;
    bufptr  += r;
    }
    return size;
}

/** Evita scritture parziali
 *
 *   \retval -1   errore (errno settato)
 *   \retval  0   se durante la scrittura la write ritorna 0
 *   \retval  1   se la scrittura termina con successo
 */
static inline int writen(long fd, void *buf, size_t size) {
    size_t left = size;
    int r;
    char *bufptr = (char*)buf;
    while(left>0) {
    if ((r=write((int)fd ,bufptr,left)) == -1) {
        if (errno == EINTR) continue;
        return -1;
    }
    if (r == 0) return 0;  
        left    -= r;
    bufptr  += r;
    }
    return 1;
}


#endif /* CONN_H */

线程通常不会运行完成;相反,它们在执行循环中等待某些触发事件,例如消息或信号量的到达。一个典型的thred函数的模式是:

void* func( void* arg )
{
    // Thread initialisation

    while( !terminated )
    {
        // Block waiting
        ...
 
        // Do stuff (handle event/message for example)
        ...
    }

    // Clean-up
    ...
}

线程函数的签名取决于线程library/OS;例如,以上对于 pthreads 是正常的。此外,一个线程可以 运行 无限期地使用 while(1)for(;;) 循环。确切的终止机制(即我示例中的 terminate 由您决定;这不是必需的。

线程循环通常是状态机的实现。

您当然可以有一个 运行-to-completion 线程,但在那种情况下,您必须为每个事件创建一个新线程 - 这样做效率很低。

阻塞调用可以像 sleep() 调用周期性任务一样简单。可以在整个任务中的多个位置进行阻塞,但这样做会使设计和调试变得更加复杂,状态机模式通常是更好的解决方案。

既然 Clifford 提供了一般信息,我将解决您计划中的具体问题并提出适当的更改建议。

  • 实际上,您的 clientFun 不仅无法处理多个(连续的)客户端连接,而且无法处理来自一个连接的多个单独消息(在多个线程中处理来自一个连接的消息是不明智,因为客户端无论如何都不是为发送并行请求而设计的);作为第一步,我们需要围绕一条消息的处理进行循环,即。 e.绕行

        RETURN_SYSCALL(lenghtRead, read(fdAccept, buffer, DIM_BUFFER), "Errore: read(fdSkt_com, buffer, DIM_BUFFER)")
    …
        SYSCALL(writen(fdAccept, buffer, lenghtRead), "Errore: writen(fdSkt_com, buffer, lengthBuffer)")
    

    放置一个for (; ; ) { … }循环并将其中的return NULL;替换为break;

  • 现在为了处理多个连接,我们需要一个外部 for (; ; ) { … } 循环

        LOCK(&mutex)
    …
        SYSCALL(write(pipeW2M_Write, &fdAccept, sizeof(int)), "Erroer: write(pipeW2M_Write, &fdAccept, sizeof(int))")
    
  • 要配合这些更改,需要在 run_server 中进行调整。服务器不再需要监视已接受的套接字描述符,因此更改

                        FD_SET(fdSkt_accept, &set);
    

                        gestioneCoda(maxFd, fdSkt_accept, set);
    

    并去掉后面对gestioneCoda的调用,所以接受的套接字会立即传递给工作线程。终于改了

                        FD_SET(pipeFdSoccket, &set);
    

                        close(pipeFdSoccket);
    

    - 当一个连接完成时,它不能被添加到监视列表,而是关闭以避免套接字描述符泄漏。