如何终止线程池中的所有预分配线程?

how to terminates all the preallocated threads in a threadpool?

我已经使用下面的结构创建了一个线程池,现在的问题是如何让所有预分配线程正常结束?

std::vector<pthread_t> preallocatedThreadsPool; // threadpool
std::queue<int> tcpQueue;  // a queue to hold my task

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condition_var = PTHREAD_COND_INITIALIZER;

void* threadFunctionUsedByThreadsPool(void *arg);

main () {
    preallocatedThreadsPool.resize(preallocatThreadsNumber);
    for(pthread_t i : preallocatedThreadsPool) {
        pthread_create(&i, NULL, threadFunctionUsedByThreadsPool, NULL);
    }


    pthread_mutex_lock(&mutex); // one thread mess with the queue at one time
 
    tcpQueue.push(task);

    pthread_cond_signal(&condition_var);
    pthread_mutex_unlock(&mutex);

}


void* threadFunctionUsedByThreadsPool(void *arg) {
    while (true) {
        pthread_mutex_lock(&mutex);
        if (tcpQueue.empty()) {  // can't get work from the queue then just wait
        pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
        task = tcpQueue.front();
        tcpQueue.pop(); 
        }
        pthread_mutex_unlock(&mutex);
    
        if (task) {
            // do task
        }
    }
    return NULL;
}

这个问题找了好几天还是没找到好的解决办法,我试过的最接近的是,当程序要退出的时候,把一个特殊的item推入队列,然后在threadFunctionUsedByThreadsPool里面,当检测到这样的项目,我会调用pthread_join,但是,当我使用gdb工具调试它时,那些预分配的线程仍然存在,任何人都可以提供帮助,最好是一些代码,例如,我如何修改threadFunctionUsedByThreadsPool,以便我可以正确退出所有预分配的线程? 非常感谢!!!

TLDR:您只需要一个线程安全变量,所有线程都可以检查工作项之间的退出条件。使用 pthread_join 等待线程退出。

首先,让我们在线程函数中正确设置条件变量的 while 循环。

而不是这个:

    pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
    task = tcpQueue.front();
    tcpQueue.pop();

检查条件变量唤醒前后队列的状态。虚假唤醒是真实存在的,不能保证另一个线程不会唤醒并获取最后一个工作项。您绝对不想从空队列中弹出。

更好:

    while (tcpQueue.empty()) {  
        pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
    }
    task = tcpQueue.front();
    tcpQueue.pop();

解决了这个问题后,我们可以引入一个新的全局布尔值来表示停止条件:

 bool stopCondition = false;

每当我们想要告诉池中的所有线程停止时,我们可以将 stopCondition 设置为 true 并向条件变量发出信号以提醒所有线程状态发生变化。读取或写入 stopCondition 应该在锁下完成。 (我想你也可以使用 std::atomic<bool>

把它们放在一起,你的线程函数就变成了这样:

void* threadFunctionUsedByThreadsPool(void* arg) {

    pthread_mutex_lock(&mutex);

    while (!stopCondition) {

        // wait for a task to be queued
        while (tcpQueue.empty() && !stopCondition) {  
            pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
        }

        if (stopCondition == false) {
            task = tcpQueue.front();
            tcpQueue.pop();

            // exit lock while operating on a task
            pthread_mutex_unlock(&mutex);

            if (task) {
                // do task
            }

            // re-acquire the lock
            pthread_mutex_lock(&mutex);

        }
 
    }

    // release the lock before exiting the function
    pthread_mutex_unlock(&mutex);
    return NULL;
}

然后是一个辅助函数,用于通知所有线程退出并等待每个线程停止。请注意,我们使用 pthread_cond_broadcast 通知所有线程从它们的条件变量等待中唤醒,而不是 pthread_cond_signal 只唤醒一个线程。

void stopThreadPool()
{

    // signal all threads to exit after they finish their current work item
    pthread_mutex_lock(&mutex);
        stopCondition = true;
        pthread_cond_broadcast(&condition_var); // notify all threads
    pthread_mutex_unlock(&mutex);

    // wait for all threads to exit
    for (auto& t : preAllocatedThreadsPool) {
        pthread_join(t, nullptr);
    }
    preAllocatedThreadsPool.clear();
}

我刚刚发现的最后一个错误 - 你的 main 没有像你想象的那样 属性 初始化你的 preAllocatedThreadsPool 向量。您正在制作 pthread_t 的副本,而不是实际使用向量中的句柄。

而不是这个:

for(pthread_t i : preallocatedThreadsPool) {

您的循环需要通过引用枚举:

更好:

for(pthread_t &i : preallocatedThreadsPool) {

发送指示池线程重新排队任务然后终止的任务。然后毒任务将 运行 遍历池中的所有线程,将它们全部杀死。我使用 null 作为毒药,(即非法任务)——当它杀死最后一个线程时不需要销毁它。您可能希望在发送 null/whatever 之前清除任务队列。如果使用 null,则只需要在任务出列之后在线程中进行 null 检查。

您只需要很少的额外代码,您不需要知道池中有多少个线程,它就可以工作:)