如何终止线程池中的所有预分配线程?
how to terminates all the preallocated threads in a threadpool?
我已经使用下面的结构创建了一个线程池,现在的问题是如何让所有预分配线程正常结束?
std::vector<pthread_t> preallocatedThreadsPool; // threadpool
std::queue<int> tcpQueue; // a queue to hold my task
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condition_var = PTHREAD_COND_INITIALIZER;
void* threadFunctionUsedByThreadsPool(void *arg);
main () {
preallocatedThreadsPool.resize(preallocatThreadsNumber);
for(pthread_t i : preallocatedThreadsPool) {
pthread_create(&i, NULL, threadFunctionUsedByThreadsPool, NULL);
}
pthread_mutex_lock(&mutex); // one thread mess with the queue at one time
tcpQueue.push(task);
pthread_cond_signal(&condition_var);
pthread_mutex_unlock(&mutex);
}
void* threadFunctionUsedByThreadsPool(void *arg) {
while (true) {
pthread_mutex_lock(&mutex);
if (tcpQueue.empty()) { // can't get work from the queue then just wait
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
task = tcpQueue.front();
tcpQueue.pop();
}
pthread_mutex_unlock(&mutex);
if (task) {
// do task
}
}
return NULL;
}
这个问题找了好几天还是没找到好的解决办法,我试过的最接近的是,当程序要退出的时候,把一个特殊的item推入队列,然后在threadFunctionUsedByThreadsPool里面,当检测到这样的项目,我会调用pthread_join,但是,当我使用gdb工具调试它时,那些预分配的线程仍然存在,任何人都可以提供帮助,最好是一些代码,例如,我如何修改threadFunctionUsedByThreadsPool,以便我可以正确退出所有预分配的线程?
非常感谢!!!
TLDR:您只需要一个线程安全变量,所有线程都可以检查工作项之间的退出条件。使用 pthread_join
等待线程退出。
首先,让我们在线程函数中正确设置条件变量的 while 循环。
而不是这个:
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
task = tcpQueue.front();
tcpQueue.pop();
检查条件变量唤醒前后队列的状态。虚假唤醒是真实存在的,不能保证另一个线程不会唤醒并获取最后一个工作项。您绝对不想从空队列中弹出。
更好:
while (tcpQueue.empty()) {
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
}
task = tcpQueue.front();
tcpQueue.pop();
解决了这个问题后,我们可以引入一个新的全局布尔值来表示停止条件:
bool stopCondition = false;
每当我们想要告诉池中的所有线程停止时,我们可以将 stopCondition
设置为 true
并向条件变量发出信号以提醒所有线程状态发生变化。读取或写入 stopCondition 应该在锁下完成。 (我想你也可以使用 std::atomic<bool>
)
把它们放在一起,你的线程函数就变成了这样:
void* threadFunctionUsedByThreadsPool(void* arg) {
pthread_mutex_lock(&mutex);
while (!stopCondition) {
// wait for a task to be queued
while (tcpQueue.empty() && !stopCondition) {
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
}
if (stopCondition == false) {
task = tcpQueue.front();
tcpQueue.pop();
// exit lock while operating on a task
pthread_mutex_unlock(&mutex);
if (task) {
// do task
}
// re-acquire the lock
pthread_mutex_lock(&mutex);
}
}
// release the lock before exiting the function
pthread_mutex_unlock(&mutex);
return NULL;
}
然后是一个辅助函数,用于通知所有线程退出并等待每个线程停止。请注意,我们使用 pthread_cond_broadcast
通知所有线程从它们的条件变量等待中唤醒,而不是 pthread_cond_signal
只唤醒一个线程。
void stopThreadPool()
{
// signal all threads to exit after they finish their current work item
pthread_mutex_lock(&mutex);
stopCondition = true;
pthread_cond_broadcast(&condition_var); // notify all threads
pthread_mutex_unlock(&mutex);
// wait for all threads to exit
for (auto& t : preAllocatedThreadsPool) {
pthread_join(t, nullptr);
}
preAllocatedThreadsPool.clear();
}
我刚刚发现的最后一个错误 - 你的 main
没有像你想象的那样 属性 初始化你的 preAllocatedThreadsPool
向量。您正在制作 pthread_t 的副本,而不是实际使用向量中的句柄。
而不是这个:
for(pthread_t i : preallocatedThreadsPool) {
您的循环需要通过引用枚举:
更好:
for(pthread_t &i : preallocatedThreadsPool) {
发送指示池线程重新排队任务然后终止的任务。然后毒任务将 运行 遍历池中的所有线程,将它们全部杀死。我使用 null 作为毒药,(即非法任务)——当它杀死最后一个线程时不需要销毁它。您可能希望在发送 null/whatever 之前清除任务队列。如果使用 null,则只需要在任务出列之后在线程中进行 null 检查。
您只需要很少的额外代码,您不需要知道池中有多少个线程,它就可以工作:)
我已经使用下面的结构创建了一个线程池,现在的问题是如何让所有预分配线程正常结束?
std::vector<pthread_t> preallocatedThreadsPool; // threadpool
std::queue<int> tcpQueue; // a queue to hold my task
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condition_var = PTHREAD_COND_INITIALIZER;
void* threadFunctionUsedByThreadsPool(void *arg);
main () {
preallocatedThreadsPool.resize(preallocatThreadsNumber);
for(pthread_t i : preallocatedThreadsPool) {
pthread_create(&i, NULL, threadFunctionUsedByThreadsPool, NULL);
}
pthread_mutex_lock(&mutex); // one thread mess with the queue at one time
tcpQueue.push(task);
pthread_cond_signal(&condition_var);
pthread_mutex_unlock(&mutex);
}
void* threadFunctionUsedByThreadsPool(void *arg) {
while (true) {
pthread_mutex_lock(&mutex);
if (tcpQueue.empty()) { // can't get work from the queue then just wait
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
task = tcpQueue.front();
tcpQueue.pop();
}
pthread_mutex_unlock(&mutex);
if (task) {
// do task
}
}
return NULL;
}
这个问题找了好几天还是没找到好的解决办法,我试过的最接近的是,当程序要退出的时候,把一个特殊的item推入队列,然后在threadFunctionUsedByThreadsPool里面,当检测到这样的项目,我会调用pthread_join,但是,当我使用gdb工具调试它时,那些预分配的线程仍然存在,任何人都可以提供帮助,最好是一些代码,例如,我如何修改threadFunctionUsedByThreadsPool,以便我可以正确退出所有预分配的线程? 非常感谢!!!
TLDR:您只需要一个线程安全变量,所有线程都可以检查工作项之间的退出条件。使用 pthread_join
等待线程退出。
首先,让我们在线程函数中正确设置条件变量的 while 循环。
而不是这个:
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
task = tcpQueue.front();
tcpQueue.pop();
检查条件变量唤醒前后队列的状态。虚假唤醒是真实存在的,不能保证另一个线程不会唤醒并获取最后一个工作项。您绝对不想从空队列中弹出。
更好:
while (tcpQueue.empty()) {
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
}
task = tcpQueue.front();
tcpQueue.pop();
解决了这个问题后,我们可以引入一个新的全局布尔值来表示停止条件:
bool stopCondition = false;
每当我们想要告诉池中的所有线程停止时,我们可以将 stopCondition
设置为 true
并向条件变量发出信号以提醒所有线程状态发生变化。读取或写入 stopCondition 应该在锁下完成。 (我想你也可以使用 std::atomic<bool>
)
把它们放在一起,你的线程函数就变成了这样:
void* threadFunctionUsedByThreadsPool(void* arg) {
pthread_mutex_lock(&mutex);
while (!stopCondition) {
// wait for a task to be queued
while (tcpQueue.empty() && !stopCondition) {
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
}
if (stopCondition == false) {
task = tcpQueue.front();
tcpQueue.pop();
// exit lock while operating on a task
pthread_mutex_unlock(&mutex);
if (task) {
// do task
}
// re-acquire the lock
pthread_mutex_lock(&mutex);
}
}
// release the lock before exiting the function
pthread_mutex_unlock(&mutex);
return NULL;
}
然后是一个辅助函数,用于通知所有线程退出并等待每个线程停止。请注意,我们使用 pthread_cond_broadcast
通知所有线程从它们的条件变量等待中唤醒,而不是 pthread_cond_signal
只唤醒一个线程。
void stopThreadPool()
{
// signal all threads to exit after they finish their current work item
pthread_mutex_lock(&mutex);
stopCondition = true;
pthread_cond_broadcast(&condition_var); // notify all threads
pthread_mutex_unlock(&mutex);
// wait for all threads to exit
for (auto& t : preAllocatedThreadsPool) {
pthread_join(t, nullptr);
}
preAllocatedThreadsPool.clear();
}
我刚刚发现的最后一个错误 - 你的 main
没有像你想象的那样 属性 初始化你的 preAllocatedThreadsPool
向量。您正在制作 pthread_t 的副本,而不是实际使用向量中的句柄。
而不是这个:
for(pthread_t i : preallocatedThreadsPool) {
您的循环需要通过引用枚举:
更好:
for(pthread_t &i : preallocatedThreadsPool) {
发送指示池线程重新排队任务然后终止的任务。然后毒任务将 运行 遍历池中的所有线程,将它们全部杀死。我使用 null 作为毒药,(即非法任务)——当它杀死最后一个线程时不需要销毁它。您可能希望在发送 null/whatever 之前清除任务队列。如果使用 null,则只需要在任务出列之后在线程中进行 null 检查。
您只需要很少的额外代码,您不需要知道池中有多少个线程,它就可以工作:)