这是使用 pthread 的正确方法吗?

Is this the correct way to use pthread?

我正在尝试使用多线程创建 HTTP 服务器。 main() 将 client_sock 从 accept() 移交给其中一个工作线程。如果没有可用的工作线程,它会一直等到有一个可用。我被限制为无法在工作线程中调用 accept()。到目前为止,这是我的代码的一部分。我的一些问题是:

  1. 我是否需要像现在一样使用 2 个 pthread 互斥锁和条件变量?
  2. 在这些情况下我是否需要使用 pthread lock 或 unlock?
  3. 如果我想在服务器上创建文件时添加互斥锁,我是否必须创建另一个互斥变量或者现有的互斥变量之一是否有效?
#include <iostream>
#include <err.h>
#include <fcntl.h>
#include <netdb.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <getopt.h>
#include <pthread.h>

#define SIZE 1024

struct shared_data
{
    int redundancy;
    int client_sock;
    int working_threads;
    int dispatch_ready;
    pthread_mutex_t* dispatch_mutex;
    pthread_mutex_t* worker_mutex;
    pthread_cond_t* dispatch_cond;
    pthread_cond_t* worker_cond;
};

void* receiveAndSend(void* obj)
{
    struct shared_data* data = (struct shared_data*) obj;

    int bytes;
    char buff[SIZE + 1];

    while(1)
    {
        while(!data->dispatch_ready)
        {
            pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
        }
        data->dispatch_ready = 0;

        data->working_threads++;

        client_sock = data->client_sock;

        bytes = recv(client_sock, buff, SIZE, 0);

        // do work

        data->working_threads--;
        pthread_cond_signal(data->worker_cond);
    }
}

int main(int argc, char* argv[])
{
    if(argc < 2 || argc > 6)
    {
        char msg[] = "Error: invalid arg amount\n";
        write(STDERR_FILENO, msg, strlen(msg));
        exit(1);
    }

    char* addr = NULL;
    unsigned short port = 80;
    int num_threads = 4;
    int redundancy = 0;
    char opt;

    while((opt = getopt(argc, argv, "N:r")) != -1)
    {
        if(opt == 'N')
        {
            num_threads = atoi(optarg);

            if(num_threads < 1)
            {
                char msg[] = "Error: invalid input for -N argument\n";
                write(STDERR_FILENO, msg, strlen(msg));
                exit(1);
            }
        }
        else if(opt == 'r')
        {
            redundancy = 1;
        }
        else
        {
            // error (getopt automatically sends an error message)
            return 1;
        }
    }

    // non-option arguments are always the last indexes of argv, no matter how they are written in the terminal
    // optind is the next index of argv after all options
    if(optind < argc)
    {
        addr = argv[optind];
        optind++;
    }

    if(optind < argc)
    {
        port = atoi(argv[optind]);
    }

    if(addr == NULL)
    {
        char msg[] = "Error: no address specified\n";
        write(STDERR_FILENO, msg, strlen(msg));
        exit(1);
    }

    struct sockaddr_in serv_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = getaddr(addr);
    serv_addr.sin_port = htons(port);

    int serv_sock = socket(AF_INET, SOCK_STREAM, 0);
    if(serv_sock < 0)
    {
        err(1, "socket()");
    }

    if(bind(serv_sock, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) < 0)
    {
        err(1, "bind()");
    }

    if(listen(serv_sock, 500) < 0)
    {
        err(1, "listen()");
    }

    // Connecting with a client
    struct sockaddr client_addr;
    socklen_t client_addrlen;

    pthread_mutex_t dispatch_mutex;
    pthread_mutex_init(&dispatch_mutex, NULL);
    pthread_mutex_t worker_mutex;
    pthread_mutex_init(&worker_mutex, NULL);
    pthread_cond_t dispatch_cond;
    pthread_cond_init(&dispatch_cond, NULL);
    pthread_cond_t worker_cond;
    pthread_cond_init(&worker_cond, NULL);

    struct shared_data data;

    data.redundancy = redundancy;
    data.dispatch_ready = 0;
    data.working_threads = 0;
    data.dispatch_mutex = &dispatch_mutex;
    data.worker_mutex = &worker_mutex;
    data.dispatch_cond = &dispatch_cond;
    data.worker_cond = &worker_cond;

    pthread_t* threads = new pthread_t[num_threads];
    for (int i = 0; i < num_threads; i++)
    {
        pthread_create(&threads[i], NULL, receiveAndSend, &data);
    }

    while(1)
    {
        data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);

        while(data.working_threads == num_threads)
        {
            pthread_cond_wait(data.worker_cond, data.worker_mutex);
        }

        data.dispatch_ready = 1;
        pthread_cond_signal(data.dispatch_cond);
    }

    return 0;
}

你的程序中有很多非常基本的错误,这很清楚地表明你不了解锁和条件变量(或指针的正确使用)。

锁保护一些共享数据。您只有一个共享数据项,因此您应该只需要一个锁(互斥锁)来保护它。

条件变量表示某些条件为真。您的用例的合理条件是 worker_availablework_available。 (将条件变量命名为 dispatch_condworker_cond 无助于清晰。)

一个条件变量总是与一个互斥锁相关联,但是你不需要两个单独的互斥锁只是因为你有两个条件变量。


关于错误。

这段代码显然有问题:

    while(1)
    {
        while(!data->dispatch_ready)
        {
            pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
        }

来自man pthread_cond_wait

atomically release mutex and cause the calling thread to block on the condition variable cond

如果这个线程从未获取过互斥量,它如何释放它?
另外,这个线程如何在不获取互斥量的情况下读取 data->dispatch_ready(与其他线程共享)?

此代码:

    struct shared_data data;

    data.redundancy = redundancy;
    data.dispatch_ready = 0;
    data.working_threads = 0;
    data.dispatch_mutex = &dispatch_mutex;
    data.worker_mutex = &worker_mutex;
    data.dispatch_cond = &dispatch_cond;
    data.worker_cond = &worker_cond;

没有错误,但有不必要的间接访问。您可以使 dispatch_mutex 和条件变量成为 shared_datapart,像这样:

struct shared_data
{
    int redundancy;
    int client_sock;
    int working_threads;
    int dispatch_ready;
    pthread_mutex_t dispatch_mutex;
    pthread_mutex_t worker_mutex;
    pthread_cond_t dispatch_cond;
    pthread_cond_t worker_cond;
};

这是我注意到的最微妙的错误:

        data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);
...
        data.dispatch_ready = 1;
        pthread_cond_signal(data.dispatch_cond);

在这里,您将唤醒 at least one 个等待 dispatch_cond 的线程,但可能会唤醒不止一个。如果多个线程被唤醒,它们都将在同一个 client_sock 上继续 recv,这可能会带来灾难性的后果。

更新:

How do I fix this.

解决这个问题的最佳和最有效的方法可能是拥有一个由锁保护的“工作项”队列(例如使用带有头指针和尾指针的双链表)。

主线程将在尾部添加元素(同时持有锁),并发出“不为空”条件变量的信号。

工作线程将删除头元素(同时持有锁)。
当队列为空时,工作线程将阻塞在“非空”条件变量上。

主线程可能会在队列已满(所有工作人员都忙)时继续添加元素,或者它可能会阻塞等待工作人员变得可用,或者它可以 return“429 太多请求”到客户.