C ++多线程在分配给原子之前锁定互斥锁

C++ multithreading locking a mutex before assigning to an atomic

在 C++ 中,您需要在分配给原子之前锁定互斥量吗?我尝试实现线程池,如此处所示 。为此,我创建了一个线程安全队列并使用了原子。特别是,在 shutdown 方法(或在我的代码中 waitForCompletion)中,需要将线程池循环函数 while 循环变量设置为 true,以便线程可以完成其工作并加入。但是由于原子是线程安全的,所以在关闭方法中为它分配 true 之前我没有锁定互斥量,如下所示。这最终导致了僵局。为什么会这样?

ThreadPool.hpp:

#pragma once 

#include <atomic> 
#include <vector> 
#include <iostream> 
#include <thread>
#include <future>
#include <mutex>
#include <queue>
#include <functional>
#include <ThreadSafeQueue.hpp>

class ThreadPool{
    public: 
        ThreadPool(std::atomic_bool& result); 
        void waitForCompletion();
        void addJob(std::function<bool()> newJob);
        void setComplete();
    private: 
        void workLoop(std::atomic_bool& result); 
        int m_numThreads; 
        std::vector<std::thread> m_threads; 
        std::atomic_bool m_workComplete; 
        std::mutex m_mutex; 
        std::condition_variable m_jobWaitCondition; 
        ThreadSafeQueue<std::function<bool()>> m_JobQueue;
};

ThreadPool.cpp:

#include <ThreadPool.hpp> 

ThreadPool::ThreadPool(std::atomic_bool& result){ 
    m_numThreads = std::thread::hardware_concurrency();
    m_workComplete = false;
    for (int i = 0; i < m_numThreads; i++)
    {
        m_threads.push_back(std::thread(&ThreadPool::workLoop, this, std::ref(result)));
    }
}

// each thread executes this loop 
void ThreadPool::workLoop(std::atomic_bool& result){ 
    while(!m_workComplete){
        std::function<bool()> currentJob;
        bool popped;
        {
            std::unique_lock<std::mutex> lock(m_mutex); 
            m_jobWaitCondition.wait(lock, [this](){
                return !m_JobQueue.empty() || m_workComplete.load();
            });
            
            popped = m_JobQueue.pop(currentJob);
        }
        if(popped){
            result = currentJob() && result;
        }
    }
}

void ThreadPool::addJob(std::function<bool()> newJob){ 
    m_JobQueue.push(newJob);
    m_jobWaitCondition.notify_one();
}

void ThreadPool::setComplete(){
    m_workComplete = true; 
}

void ThreadPool::waitForCompletion(){
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_workComplete.store(true);
    }
    
    m_jobWaitCondition.notify_all();

    for(auto& thread : m_threads){ 
        thread.join();
    }
    
    m_threads.clear();
}

ThreadSafeQueue.hpp:

#pragma once

#include <mutex>
#include <queue>

template <class T>
class ThreadSafeQueue {
   public:
    ThreadSafeQueue(){};
    void push(T element) {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_queue.push(element);
    }
    bool pop(T& retElement) {
        std::unique_lock<std::mutex> lock(m_mutex);
        if (m_queue.empty()) {
            return false;
        }
        retElement = m_queue.front();
        m_queue.pop();
        return true;
    }
    bool empty(){ 
        std::unique_lock<std::mutex> lock(m_mutex); 
        return m_queue.empty();
    }

   private:
    std::queue<T> m_queue;
    std::mutex m_mutex;
};

您可能想在 workLoop() 中的 wait() returns 之后检查 m_workComplete,否则您可能会在空队列上调用 pop(),这很糟糕。

您在等待条件时遇到了死锁。虽然只有在添加新作业时才会通知条件。您的线程正在等待通知该条件。您可能对条件“条件”进行非确定性(从您的角度来看)检查,但您可能不依赖它们存在。

当任务是 completed.One 可能的位置时,您需要通知您的条件,即当您要求等待完成时或在可以实现完成状态的任何时候。

我把你的代码改成这样来说明:

// each thread executes this loop 
void ThreadPool::workLoop(std::atomic_bool& result){ 
    while(!m_workComplete)
    {
        std::function<bool()> currentJob;
        bool popped;
        {
        std::cout<<"Before the lock"<<std::endl;
            std::unique_lock<std::mutex> lock(m_mutex); 
        std::cout<<"After lock"<<std::endl;
            m_jobWaitCondition.wait(lock, [this]()
        {
            bool res = (!m_JobQueue.empty() || m_workComplete.load() );
        std::cout<<"res:"<<res<<std::endl;
                return res;
            });
        std::cout<<"After wait"<<std::endl;
            
            popped = m_JobQueue.pop(currentJob);
        }
        if(popped)
    { 
        std::cout<<"Popped"<<std::endl;
            result = currentJob() && result;
        std::cout<<"Popped 2"<<std::endl;
        }
    }
    std::cout<<"LEave"<<std::endl;
}

void ThreadPool::addJob(std::function<bool()> newJob){ 
    m_JobQueue.push(newJob);
    std::cout<<"before call notify"<<std::endl;
    m_jobWaitCondition.notify_one();
    std::cout<<"After call notify"<<std::endl;
}

我添加了一个作业,打印的内容是:

锁定前 锁定后 res:0 上锁前 锁定后 上锁前 上锁前 上锁前 res:0 上锁前 锁定后 res:0 锁定后 res:0 上锁前 锁定后 res:0 锁定后 通话前 notifyres:1

锁前等待后

弹出 锁定后 res:0 调用后通知res:0

弹出 2 上锁前 res:0 res:0 res:0 res:0 锁定后

res:0

锁定后

res:0

注意最后一个通知是在最后一个“锁定后”行之前调用的(在条件等待之前)