多线程 Web 服务器 pThread 问题

Multithreading Web Server pThread Issue

我正在做一个模拟客户请求并从不同文件运行进程的项目(这个进程是给我的,所以我不应该编辑它)。

我必须使用生产者-消费者方法来解决这个问题,并使用信号量和互斥锁来确保没有死锁等。我 运行 遇到了我的线程似乎没有意识到的问题当存在连接且未被触发以完成服务器上的作业时。该程序成功创建了线程,但我认为我只是遗漏了一些东西,或者不了解我上面提到的所有事情如何协同工作的基础知识。我想了解触发线程开始工作的逻辑,以及这对于等待客户端连接的侦听器/套接字系统有何意义。这是代码的细分,我不想包含所有代码,只是包含它如何工作的框架。

#define MAX_REQUEST 100

int port, numThread;
typedef int bufferItem;
bufferItem buffer[MAX_REQUEST]; // shared buffer
int bufferIndex;
bool connection = false;
pthread_t threads[MAX_REQUEST];
sem_t full;
sem_t empty;
pthread_mutex_t mutex;

int insertItem(bufferItem item) {
    // This method adds the item in the argument to the buffer and increments the buffer index.
    if(bufferIndex < MAX_REQUEST) {
        buffer[bufferIndex] = item;
        bufferIndex++;
        return(0); // success
    }
    else {
        return(-1); // error
    }
}

int removeItem() {
    bufferItem item;
    if(bufferIndex > 0) {
        bufferIndex--;
        item = buffer[bufferIndex];

        process(item); // run process (outside method) HERE

        return(0); // success
    }
    else {
        return(-1); // error
    }  
}

int req_handler() {
    // this method opens a socket and starts HTTP server listening on port
    // after connection to client is successful, following code executes
    while (1) {
        int s;
        s = accept(sock, NULL, NULL);
        if (s < 0) break;
        bool connection = true;
        printf("Connected!\n");

        //put in request for info (file descriptor) into shared buffer
        insertItem(s);
        sem_post(&empty); // I think I may have an error here, I have tried different things to trigger the thread but nothing works
    }
    // socket is closed
}

void *producer (void *arg) {
    // tests if there is a connection using a boolean
    // decrements buffer with empty semaphore
    // locks mutex
    int ret = insertItem(gettid()); // put item in buffer
    // unlocks mutex so other threads can use
    // increments buffer with full semaphore
}

void *consumer (void *arg) {
    // tests if there is a connection using a boolean
    // decrements full semaphore
    // locks mutex
    int ret = removeItem(); // put item in buffer
    // unlocks mutex
    // increments buffer with empty semaphore
}

int main(int argc, char *argv[]) {
    // sets up command line arguments to get port number and number of threads
    // initializes semaphores and mutex
    pthread_mutex_init(&mutex, NULL);
    sem_init(&empty, 0, MAX_REQUEST);
    sem_init(&full, 0, 0);

    int threadNum[numThread]; // keep track of threads

    for (i = 0; i < numThread; i++) { // create threads
        threadNum[i] = i;
        if (i == 0) {
            if(pthread_create(&threads[i], NULL, producer, (void *)&threadNum[i]) != 0) {
                perror("Producer: Thread creation error");
            }
        } else {
            if(pthread_create(&threads[i], NULL, consumer, (void *)&threadNum[i]) != 0) {
                perror("Consumer: Thread creation error");
            }
        }
        printf("Main: Thread Created :)\n");
    }

    // wait until all threads have exited (joined)
    for(i = 0; i < numThread; i++) {
        printf("Main: Thread Joined with numThread: %d and i %d \n",numThread,i);
        if(pthread_join(threads[i], NULL) != 0) {
            perror("Main: Thread join error");
        } 
    }

    req_handler(); // starts listener

    printf("All threads joined, destrying semaphores and mutex :)\n");

    // destroys semaphores and mutexes

    return 0;
}

您的信号量即将闲置。这不是编码问题,而是理解问题,我就这样回答。

我将解释为好像存储可以包含多个 (n) 项,但即使存储只能容纳一项也是如此。同样,我将解释它,因为有多个生产者 (p) 和多个消费者 (c),但即使有每个只有一个。但是,如果 n=1、p=1 和 c=1,则不需要互斥量。


数据流为:

生产者 -> 存储 -> 消费者

为了实现这一点,

  • 消费者需要等待数据在存储中可用。
  • 生产者需要等待 space 存储可用。

这可以使用两个信号量来实现:

  • 跟踪存储中的项目数量的一个。
  • 跟踪存储中可用 space 的数量。

逻辑:

  • 最初:

    • 存储中有 0 个项目。
    • n space 个项目。
  • 当生产者想要添加一个项目时:

    • 他们必须等待 space 可用。
  • 生产者生产物品后:

    • 他们必须通过增加存储物品的数量来表明这一点。
  • 当消费者想要消费一个项目时:

    • 他们必须等待商品上架。
  • 消费者消费商品后:

    • 他们必须通过增加 space 的存储量来表明这一点。

这些分别对应两个信号量的init、down/wait和up/post操作

实际上 reading/modifying 存储时使用互斥体(一旦信号量表明它处于所需状态)。