C ++多线程在分配给原子之前锁定互斥锁
C++ multithreading locking a mutex before assigning to an atomic
在 C++ 中,您需要在分配给原子之前锁定互斥量吗?我尝试实现线程池,如此处所示 。为此,我创建了一个线程安全队列并使用了原子。特别是,在 shutdown
方法(或在我的代码中 waitForCompletion
)中,需要将线程池循环函数 while 循环变量设置为 true,以便线程可以完成其工作并加入。但是由于原子是线程安全的,所以在关闭方法中为它分配 true 之前我没有锁定互斥量,如下所示。这最终导致了僵局。为什么会这样?
ThreadPool.hpp:
#pragma once
#include <atomic>
#include <vector>
#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <queue>
#include <functional>
#include <ThreadSafeQueue.hpp>
class ThreadPool{
public:
ThreadPool(std::atomic_bool& result);
void waitForCompletion();
void addJob(std::function<bool()> newJob);
void setComplete();
private:
void workLoop(std::atomic_bool& result);
int m_numThreads;
std::vector<std::thread> m_threads;
std::atomic_bool m_workComplete;
std::mutex m_mutex;
std::condition_variable m_jobWaitCondition;
ThreadSafeQueue<std::function<bool()>> m_JobQueue;
};
ThreadPool.cpp:
#include <ThreadPool.hpp>
ThreadPool::ThreadPool(std::atomic_bool& result){
m_numThreads = std::thread::hardware_concurrency();
m_workComplete = false;
for (int i = 0; i < m_numThreads; i++)
{
m_threads.push_back(std::thread(&ThreadPool::workLoop, this, std::ref(result)));
}
}
// each thread executes this loop
void ThreadPool::workLoop(std::atomic_bool& result){
while(!m_workComplete){
std::function<bool()> currentJob;
bool popped;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_jobWaitCondition.wait(lock, [this](){
return !m_JobQueue.empty() || m_workComplete.load();
});
popped = m_JobQueue.pop(currentJob);
}
if(popped){
result = currentJob() && result;
}
}
}
void ThreadPool::addJob(std::function<bool()> newJob){
m_JobQueue.push(newJob);
m_jobWaitCondition.notify_one();
}
void ThreadPool::setComplete(){
m_workComplete = true;
}
void ThreadPool::waitForCompletion(){
{
std::unique_lock<std::mutex> lock(m_mutex);
m_workComplete.store(true);
}
m_jobWaitCondition.notify_all();
for(auto& thread : m_threads){
thread.join();
}
m_threads.clear();
}
ThreadSafeQueue.hpp:
#pragma once
#include <mutex>
#include <queue>
template <class T>
class ThreadSafeQueue {
public:
ThreadSafeQueue(){};
void push(T element) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(element);
}
bool pop(T& retElement) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty()) {
return false;
}
retElement = m_queue.front();
m_queue.pop();
return true;
}
bool empty(){
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
private:
std::queue<T> m_queue;
std::mutex m_mutex;
};
您可能想在 workLoop() 中的 wait() returns 之后检查 m_workComplete,否则您可能会在空队列上调用 pop(),这很糟糕。
您在等待条件时遇到了死锁。虽然只有在添加新作业时才会通知条件。您的线程正在等待通知该条件。您可能对条件“条件”进行非确定性(从您的角度来看)检查,但您可能不依赖它们存在。
当任务是 completed.One 可能的位置时,您需要通知您的条件,即当您要求等待完成时或在可以实现完成状态的任何时候。
我把你的代码改成这样来说明:
// each thread executes this loop
void ThreadPool::workLoop(std::atomic_bool& result){
while(!m_workComplete)
{
std::function<bool()> currentJob;
bool popped;
{
std::cout<<"Before the lock"<<std::endl;
std::unique_lock<std::mutex> lock(m_mutex);
std::cout<<"After lock"<<std::endl;
m_jobWaitCondition.wait(lock, [this]()
{
bool res = (!m_JobQueue.empty() || m_workComplete.load() );
std::cout<<"res:"<<res<<std::endl;
return res;
});
std::cout<<"After wait"<<std::endl;
popped = m_JobQueue.pop(currentJob);
}
if(popped)
{
std::cout<<"Popped"<<std::endl;
result = currentJob() && result;
std::cout<<"Popped 2"<<std::endl;
}
}
std::cout<<"LEave"<<std::endl;
}
void ThreadPool::addJob(std::function<bool()> newJob){
m_JobQueue.push(newJob);
std::cout<<"before call notify"<<std::endl;
m_jobWaitCondition.notify_one();
std::cout<<"After call notify"<<std::endl;
}
我添加了一个作业,打印的内容是:
锁定前
锁定后
res:0
上锁前
锁定后
上锁前
上锁前
上锁前
res:0
上锁前
锁定后
res:0
锁定后
res:0
上锁前
锁定后
res:0
锁定后
通话前 notifyres:1
锁前等待后
弹出
锁定后
res:0
调用后通知res:0
弹出 2
上锁前
res:0
res:0
res:0
res:0
锁定后
res:0
锁定后
res:0
注意最后一个通知是在最后一个“锁定后”行之前调用的(在条件等待之前)
在 C++ 中,您需要在分配给原子之前锁定互斥量吗?我尝试实现线程池,如此处所示 。为此,我创建了一个线程安全队列并使用了原子。特别是,在 shutdown
方法(或在我的代码中 waitForCompletion
)中,需要将线程池循环函数 while 循环变量设置为 true,以便线程可以完成其工作并加入。但是由于原子是线程安全的,所以在关闭方法中为它分配 true 之前我没有锁定互斥量,如下所示。这最终导致了僵局。为什么会这样?
ThreadPool.hpp:
#pragma once
#include <atomic>
#include <vector>
#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <queue>
#include <functional>
#include <ThreadSafeQueue.hpp>
class ThreadPool{
public:
ThreadPool(std::atomic_bool& result);
void waitForCompletion();
void addJob(std::function<bool()> newJob);
void setComplete();
private:
void workLoop(std::atomic_bool& result);
int m_numThreads;
std::vector<std::thread> m_threads;
std::atomic_bool m_workComplete;
std::mutex m_mutex;
std::condition_variable m_jobWaitCondition;
ThreadSafeQueue<std::function<bool()>> m_JobQueue;
};
ThreadPool.cpp:
#include <ThreadPool.hpp>
ThreadPool::ThreadPool(std::atomic_bool& result){
m_numThreads = std::thread::hardware_concurrency();
m_workComplete = false;
for (int i = 0; i < m_numThreads; i++)
{
m_threads.push_back(std::thread(&ThreadPool::workLoop, this, std::ref(result)));
}
}
// each thread executes this loop
void ThreadPool::workLoop(std::atomic_bool& result){
while(!m_workComplete){
std::function<bool()> currentJob;
bool popped;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_jobWaitCondition.wait(lock, [this](){
return !m_JobQueue.empty() || m_workComplete.load();
});
popped = m_JobQueue.pop(currentJob);
}
if(popped){
result = currentJob() && result;
}
}
}
void ThreadPool::addJob(std::function<bool()> newJob){
m_JobQueue.push(newJob);
m_jobWaitCondition.notify_one();
}
void ThreadPool::setComplete(){
m_workComplete = true;
}
void ThreadPool::waitForCompletion(){
{
std::unique_lock<std::mutex> lock(m_mutex);
m_workComplete.store(true);
}
m_jobWaitCondition.notify_all();
for(auto& thread : m_threads){
thread.join();
}
m_threads.clear();
}
ThreadSafeQueue.hpp:
#pragma once
#include <mutex>
#include <queue>
template <class T>
class ThreadSafeQueue {
public:
ThreadSafeQueue(){};
void push(T element) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(element);
}
bool pop(T& retElement) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty()) {
return false;
}
retElement = m_queue.front();
m_queue.pop();
return true;
}
bool empty(){
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
private:
std::queue<T> m_queue;
std::mutex m_mutex;
};
您可能想在 workLoop() 中的 wait() returns 之后检查 m_workComplete,否则您可能会在空队列上调用 pop(),这很糟糕。
您在等待条件时遇到了死锁。虽然只有在添加新作业时才会通知条件。您的线程正在等待通知该条件。您可能对条件“条件”进行非确定性(从您的角度来看)检查,但您可能不依赖它们存在。
当任务是 completed.One 可能的位置时,您需要通知您的条件,即当您要求等待完成时或在可以实现完成状态的任何时候。
我把你的代码改成这样来说明:
// each thread executes this loop
void ThreadPool::workLoop(std::atomic_bool& result){
while(!m_workComplete)
{
std::function<bool()> currentJob;
bool popped;
{
std::cout<<"Before the lock"<<std::endl;
std::unique_lock<std::mutex> lock(m_mutex);
std::cout<<"After lock"<<std::endl;
m_jobWaitCondition.wait(lock, [this]()
{
bool res = (!m_JobQueue.empty() || m_workComplete.load() );
std::cout<<"res:"<<res<<std::endl;
return res;
});
std::cout<<"After wait"<<std::endl;
popped = m_JobQueue.pop(currentJob);
}
if(popped)
{
std::cout<<"Popped"<<std::endl;
result = currentJob() && result;
std::cout<<"Popped 2"<<std::endl;
}
}
std::cout<<"LEave"<<std::endl;
}
void ThreadPool::addJob(std::function<bool()> newJob){
m_JobQueue.push(newJob);
std::cout<<"before call notify"<<std::endl;
m_jobWaitCondition.notify_one();
std::cout<<"After call notify"<<std::endl;
}
我添加了一个作业,打印的内容是:
锁定前 锁定后 res:0 上锁前 锁定后 上锁前 上锁前 上锁前 res:0 上锁前 锁定后 res:0 锁定后 res:0 上锁前 锁定后 res:0 锁定后 通话前 notifyres:1
锁前等待后
弹出 锁定后 res:0 调用后通知res:0
弹出 2 上锁前 res:0 res:0 res:0 res:0 锁定后
res:0
锁定后
res:0
注意最后一个通知是在最后一个“锁定后”行之前调用的(在条件等待之前)