这是使用 pthread 的正确方法吗?
Is this the correct way to use pthread?
我正在尝试使用多线程创建 HTTP 服务器。 main() 将 client_sock 从 accept() 移交给其中一个工作线程。如果没有可用的工作线程,它会一直等到有一个可用。我被限制为无法在工作线程中调用 accept()。到目前为止,这是我的代码的一部分。我的一些问题是:
- 我是否需要像现在一样使用 2 个 pthread 互斥锁和条件变量?
- 在这些情况下我是否需要使用 pthread lock 或 unlock?
- 如果我想在服务器上创建文件时添加互斥锁,我是否必须创建另一个互斥变量或者现有的互斥变量之一是否有效?
#include <iostream>
#include <err.h>
#include <fcntl.h>
#include <netdb.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <getopt.h>
#include <pthread.h>
#define SIZE 1024
struct shared_data
{
int redundancy;
int client_sock;
int working_threads;
int dispatch_ready;
pthread_mutex_t* dispatch_mutex;
pthread_mutex_t* worker_mutex;
pthread_cond_t* dispatch_cond;
pthread_cond_t* worker_cond;
};
void* receiveAndSend(void* obj)
{
struct shared_data* data = (struct shared_data*) obj;
int bytes;
char buff[SIZE + 1];
while(1)
{
while(!data->dispatch_ready)
{
pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
}
data->dispatch_ready = 0;
data->working_threads++;
client_sock = data->client_sock;
bytes = recv(client_sock, buff, SIZE, 0);
// do work
data->working_threads--;
pthread_cond_signal(data->worker_cond);
}
}
int main(int argc, char* argv[])
{
if(argc < 2 || argc > 6)
{
char msg[] = "Error: invalid arg amount\n";
write(STDERR_FILENO, msg, strlen(msg));
exit(1);
}
char* addr = NULL;
unsigned short port = 80;
int num_threads = 4;
int redundancy = 0;
char opt;
while((opt = getopt(argc, argv, "N:r")) != -1)
{
if(opt == 'N')
{
num_threads = atoi(optarg);
if(num_threads < 1)
{
char msg[] = "Error: invalid input for -N argument\n";
write(STDERR_FILENO, msg, strlen(msg));
exit(1);
}
}
else if(opt == 'r')
{
redundancy = 1;
}
else
{
// error (getopt automatically sends an error message)
return 1;
}
}
// non-option arguments are always the last indexes of argv, no matter how they are written in the terminal
// optind is the next index of argv after all options
if(optind < argc)
{
addr = argv[optind];
optind++;
}
if(optind < argc)
{
port = atoi(argv[optind]);
}
if(addr == NULL)
{
char msg[] = "Error: no address specified\n";
write(STDERR_FILENO, msg, strlen(msg));
exit(1);
}
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = getaddr(addr);
serv_addr.sin_port = htons(port);
int serv_sock = socket(AF_INET, SOCK_STREAM, 0);
if(serv_sock < 0)
{
err(1, "socket()");
}
if(bind(serv_sock, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) < 0)
{
err(1, "bind()");
}
if(listen(serv_sock, 500) < 0)
{
err(1, "listen()");
}
// Connecting with a client
struct sockaddr client_addr;
socklen_t client_addrlen;
pthread_mutex_t dispatch_mutex;
pthread_mutex_init(&dispatch_mutex, NULL);
pthread_mutex_t worker_mutex;
pthread_mutex_init(&worker_mutex, NULL);
pthread_cond_t dispatch_cond;
pthread_cond_init(&dispatch_cond, NULL);
pthread_cond_t worker_cond;
pthread_cond_init(&worker_cond, NULL);
struct shared_data data;
data.redundancy = redundancy;
data.dispatch_ready = 0;
data.working_threads = 0;
data.dispatch_mutex = &dispatch_mutex;
data.worker_mutex = &worker_mutex;
data.dispatch_cond = &dispatch_cond;
data.worker_cond = &worker_cond;
pthread_t* threads = new pthread_t[num_threads];
for (int i = 0; i < num_threads; i++)
{
pthread_create(&threads[i], NULL, receiveAndSend, &data);
}
while(1)
{
data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);
while(data.working_threads == num_threads)
{
pthread_cond_wait(data.worker_cond, data.worker_mutex);
}
data.dispatch_ready = 1;
pthread_cond_signal(data.dispatch_cond);
}
return 0;
}
你的程序中有很多非常基本的错误,这很清楚地表明你不了解锁和条件变量(或指针的正确使用)。
锁保护一些共享数据。您只有一个共享数据项,因此您应该只需要一个锁(互斥锁)来保护它。
条件变量表示某些条件为真。您的用例的合理条件是 worker_available
和 work_available
。 (将条件变量命名为 dispatch_cond
和 worker_cond
无助于清晰。)
一个条件变量总是与一个互斥锁相关联,但是你不需要两个单独的互斥锁只是因为你有两个条件变量。
关于错误。
这段代码显然有问题:
while(1)
{
while(!data->dispatch_ready)
{
pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
}
来自man pthread_cond_wait
:
atomically release mutex
and cause the calling thread to block on the condition variable cond
如果这个线程从未获取过互斥量,它如何释放它?
另外,这个线程如何在不获取互斥量的情况下读取 data->dispatch_ready
(与其他线程共享)?
此代码:
struct shared_data data;
data.redundancy = redundancy;
data.dispatch_ready = 0;
data.working_threads = 0;
data.dispatch_mutex = &dispatch_mutex;
data.worker_mutex = &worker_mutex;
data.dispatch_cond = &dispatch_cond;
data.worker_cond = &worker_cond;
没有错误,但有不必要的间接访问。您可以使 dispatch_mutex
和条件变量成为 shared_data
的 part,像这样:
struct shared_data
{
int redundancy;
int client_sock;
int working_threads;
int dispatch_ready;
pthread_mutex_t dispatch_mutex;
pthread_mutex_t worker_mutex;
pthread_cond_t dispatch_cond;
pthread_cond_t worker_cond;
};
这是我注意到的最微妙的错误:
data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);
...
data.dispatch_ready = 1;
pthread_cond_signal(data.dispatch_cond);
在这里,您将唤醒 at least one 个等待 dispatch_cond
的线程,但可能会唤醒不止一个。如果多个线程被唤醒,它们都将在同一个 client_sock
上继续 recv
,这可能会带来灾难性的后果。
更新:
How do I fix this.
解决这个问题的最佳和最有效的方法可能是拥有一个由锁保护的“工作项”队列(例如使用带有头指针和尾指针的双链表)。
主线程将在尾部添加元素(同时持有锁),并发出“不为空”条件变量的信号。
工作线程将删除头元素(同时持有锁)。
当队列为空时,工作线程将阻塞在“非空”条件变量上。
主线程可能会在队列已满(所有工作人员都忙)时继续添加元素,或者它可能会阻塞等待工作人员变得可用,或者它可以 return“429 太多请求”到客户.
我正在尝试使用多线程创建 HTTP 服务器。 main() 将 client_sock 从 accept() 移交给其中一个工作线程。如果没有可用的工作线程,它会一直等到有一个可用。我被限制为无法在工作线程中调用 accept()。到目前为止,这是我的代码的一部分。我的一些问题是:
- 我是否需要像现在一样使用 2 个 pthread 互斥锁和条件变量?
- 在这些情况下我是否需要使用 pthread lock 或 unlock?
- 如果我想在服务器上创建文件时添加互斥锁,我是否必须创建另一个互斥变量或者现有的互斥变量之一是否有效?
#include <iostream>
#include <err.h>
#include <fcntl.h>
#include <netdb.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <getopt.h>
#include <pthread.h>
#define SIZE 1024
struct shared_data
{
int redundancy;
int client_sock;
int working_threads;
int dispatch_ready;
pthread_mutex_t* dispatch_mutex;
pthread_mutex_t* worker_mutex;
pthread_cond_t* dispatch_cond;
pthread_cond_t* worker_cond;
};
void* receiveAndSend(void* obj)
{
struct shared_data* data = (struct shared_data*) obj;
int bytes;
char buff[SIZE + 1];
while(1)
{
while(!data->dispatch_ready)
{
pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
}
data->dispatch_ready = 0;
data->working_threads++;
client_sock = data->client_sock;
bytes = recv(client_sock, buff, SIZE, 0);
// do work
data->working_threads--;
pthread_cond_signal(data->worker_cond);
}
}
int main(int argc, char* argv[])
{
if(argc < 2 || argc > 6)
{
char msg[] = "Error: invalid arg amount\n";
write(STDERR_FILENO, msg, strlen(msg));
exit(1);
}
char* addr = NULL;
unsigned short port = 80;
int num_threads = 4;
int redundancy = 0;
char opt;
while((opt = getopt(argc, argv, "N:r")) != -1)
{
if(opt == 'N')
{
num_threads = atoi(optarg);
if(num_threads < 1)
{
char msg[] = "Error: invalid input for -N argument\n";
write(STDERR_FILENO, msg, strlen(msg));
exit(1);
}
}
else if(opt == 'r')
{
redundancy = 1;
}
else
{
// error (getopt automatically sends an error message)
return 1;
}
}
// non-option arguments are always the last indexes of argv, no matter how they are written in the terminal
// optind is the next index of argv after all options
if(optind < argc)
{
addr = argv[optind];
optind++;
}
if(optind < argc)
{
port = atoi(argv[optind]);
}
if(addr == NULL)
{
char msg[] = "Error: no address specified\n";
write(STDERR_FILENO, msg, strlen(msg));
exit(1);
}
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = getaddr(addr);
serv_addr.sin_port = htons(port);
int serv_sock = socket(AF_INET, SOCK_STREAM, 0);
if(serv_sock < 0)
{
err(1, "socket()");
}
if(bind(serv_sock, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) < 0)
{
err(1, "bind()");
}
if(listen(serv_sock, 500) < 0)
{
err(1, "listen()");
}
// Connecting with a client
struct sockaddr client_addr;
socklen_t client_addrlen;
pthread_mutex_t dispatch_mutex;
pthread_mutex_init(&dispatch_mutex, NULL);
pthread_mutex_t worker_mutex;
pthread_mutex_init(&worker_mutex, NULL);
pthread_cond_t dispatch_cond;
pthread_cond_init(&dispatch_cond, NULL);
pthread_cond_t worker_cond;
pthread_cond_init(&worker_cond, NULL);
struct shared_data data;
data.redundancy = redundancy;
data.dispatch_ready = 0;
data.working_threads = 0;
data.dispatch_mutex = &dispatch_mutex;
data.worker_mutex = &worker_mutex;
data.dispatch_cond = &dispatch_cond;
data.worker_cond = &worker_cond;
pthread_t* threads = new pthread_t[num_threads];
for (int i = 0; i < num_threads; i++)
{
pthread_create(&threads[i], NULL, receiveAndSend, &data);
}
while(1)
{
data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);
while(data.working_threads == num_threads)
{
pthread_cond_wait(data.worker_cond, data.worker_mutex);
}
data.dispatch_ready = 1;
pthread_cond_signal(data.dispatch_cond);
}
return 0;
}
你的程序中有很多非常基本的错误,这很清楚地表明你不了解锁和条件变量(或指针的正确使用)。
锁保护一些共享数据。您只有一个共享数据项,因此您应该只需要一个锁(互斥锁)来保护它。
条件变量表示某些条件为真。您的用例的合理条件是 worker_available
和 work_available
。 (将条件变量命名为 dispatch_cond
和 worker_cond
无助于清晰。)
一个条件变量总是与一个互斥锁相关联,但是你不需要两个单独的互斥锁只是因为你有两个条件变量。
关于错误。
这段代码显然有问题:
while(1)
{
while(!data->dispatch_ready)
{
pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
}
来自man pthread_cond_wait
:
atomically release
mutex
and cause the calling thread to block on the condition variablecond
如果这个线程从未获取过互斥量,它如何释放它?
另外,这个线程如何在不获取互斥量的情况下读取 data->dispatch_ready
(与其他线程共享)?
此代码:
struct shared_data data;
data.redundancy = redundancy;
data.dispatch_ready = 0;
data.working_threads = 0;
data.dispatch_mutex = &dispatch_mutex;
data.worker_mutex = &worker_mutex;
data.dispatch_cond = &dispatch_cond;
data.worker_cond = &worker_cond;
没有错误,但有不必要的间接访问。您可以使 dispatch_mutex
和条件变量成为 shared_data
的 part,像这样:
struct shared_data
{
int redundancy;
int client_sock;
int working_threads;
int dispatch_ready;
pthread_mutex_t dispatch_mutex;
pthread_mutex_t worker_mutex;
pthread_cond_t dispatch_cond;
pthread_cond_t worker_cond;
};
这是我注意到的最微妙的错误:
data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);
...
data.dispatch_ready = 1;
pthread_cond_signal(data.dispatch_cond);
在这里,您将唤醒 at least one 个等待 dispatch_cond
的线程,但可能会唤醒不止一个。如果多个线程被唤醒,它们都将在同一个 client_sock
上继续 recv
,这可能会带来灾难性的后果。
更新:
How do I fix this.
解决这个问题的最佳和最有效的方法可能是拥有一个由锁保护的“工作项”队列(例如使用带有头指针和尾指针的双链表)。
主线程将在尾部添加元素(同时持有锁),并发出“不为空”条件变量的信号。
工作线程将删除头元素(同时持有锁)。
当队列为空时,工作线程将阻塞在“非空”条件变量上。
主线程可能会在队列已满(所有工作人员都忙)时继续添加元素,或者它可能会阻塞等待工作人员变得可用,或者它可以 return“429 太多请求”到客户.