std::condition_variable::notify_all() 只唤醒我的线程池中的一个线程
std::condition_variable::notify_all() only wakes up one thread in my threadpool
我正在尝试编写一个非常简单的线程池来了解它们是如何在后台工作的。不过,我 运行 遇到了问题。当我使用 condition_variable 并调用 notify_all() 时,它只会唤醒我的池中的一个线程。
其他一切正常。我已经排队了 900 个作业,每个作业都有不错的有效负载。唤醒的一个线程消耗了所有这些工作,然后又回到睡眠状态。在下一个循环中,这一切再次发生。
问题是只有一个线程完成工作!我怎么搞砸了这个模板?
ThreadPool.h:
#pragma once
#include <mutex>
#include <stack>
#include <atomic>
#include <thread>
#include <condition_variable>
class ThreadPool
{
friend void __stdcall ThreadFunc();
public:
static ThreadPool & GetInstance()
{
static ThreadPool sInstance;
return (sInstance);
}
public:
void AddJob(Job * job);
void DoAllJobs();
private:
Job * GetJob();
private:
const static uint32_t ThreadCount = 8;
std::mutex JobMutex;
std::stack<Job *> Jobs;
volatile std::atomic<int> JobWorkCounter;
std::mutex SharedLock;
std::thread Threads[ThreadCount];
std::condition_variable Signal;
private:
ThreadPool();
~ThreadPool();
public:
ThreadPool(ThreadPool const &) = delete;
void operator = (ThreadPool const &) = delete;
};
ThreadPool.cpp:
#include "ThreadPool.h"
void __stdcall ThreadFunc()
{
std::unique_lock<std::mutex> lock(ThreadPool::GetInstance().SharedLock);
while (true)
{
ThreadPool::GetInstance().Signal.wait(lock);
while (Job * job = ThreadPool::GetInstance().GetJob())
{
job->_jobFn(job->_args);
ThreadPool::GetInstance().JobWorkCounter--;
}
}
}
ThreadPool::ThreadPool()
{
JobWorkCounter = 0;
for (uint32_t i = 0; i < ThreadCount; ++i)
Threads[i] = std::thread(ThreadFunc);
}
ThreadPool::~ThreadPool()
{
}
void ThreadPool::AddJob(Job * job)
{
JobWorkCounter++;
JobMutex.lock();
{
Jobs.push(job);
}
JobMutex.unlock();
}
void ThreadPool::DoAllJobs()
{
Signal.notify_all();
while (JobWorkCounter > 0)
{
Sleep(0);
}
}
Job * ThreadPool::GetJob()
{
Job * return_value = nullptr;
JobMutex.lock();
{
if (Jobs.empty() == false)
{
return_value = Jobs.top();
Jobs.pop();
}
}
JobMutex.unlock();
return (return_value);
}
感谢您的帮助!对不起大码post.
std::unique_lock<std::mutex> lock(ThreadPool::GetInstance().SharedLock);
每个线程首先获取此互斥体。
ThreadPool::GetInstance().Signal.wait(lock);
当主线程执行 notify_all()
时,所有线程都会收到来自条件变量的信号,但是您忘记了一个关键细节:在被条件变量通知后醒来后,互斥锁自动获取重新锁定。这就是 wait()
的工作原理。在你的 C++ 书籍或手册页中阅读它的文档;并且只有一个线程能够做到这一点。所有其他唤醒的线程也将尝试锁定互斥锁,但只有第一个赢得比赛并会这样做,所有其他线程将休眠并继续做梦。
收到通知后的线程将不会return从wait()
直到那个线程也成功重新锁定互斥体。
从wait()
到return必须发生两件事:线程从条件变量得到通知,和 线程重新锁定互斥体,成功。 wait()
解锁互斥量并等待条件变量,原子地,并在收到通知时重新锁定互斥量。
因此,幸运线程将锁定互斥量,并继续清空队列中的所有作业,然后返回循环顶部并再次 wait()
。这将解锁互斥量,现在其他一些幸运的线程(已收到通知但耐心等待它沐浴在阳光和荣耀中的机会)将能够锁定互斥量。以这种方式,所有其他线程将轮流,大象式,醒来,检查作业队列,在那里找不到任何东西,然后进入睡眠状态。
这就是您看到此行为的原因。
要使显示的代码线程安全,必须完成两件基本的事情。
1) 你不需要两个互斥量,一个就足够了。
2) 在 wait()
处理条件变量之前,检查作业队列中是否有内容。如果有东西,将其删除,并解锁互斥锁,然后执行此操作。
3) wait()
仅当作业队列为空时。在 wait()
returns 之后,重新锁定互斥量,然后检查作业队列是否仍然为空(此时你不能真正保证它不为空,只能保证它 可以非空)。
您只需要一个互斥锁来保护对非线程安全作业队列的访问,并等待条件变量。
除非您想设计一个新模式,否则使用条件变量的简单 "monkey-see monkey-do" 方法总是使用 3 种东西。
一个条件变量、一个互斥锁和一条消息。
std::condition_variable cv;
mutable std::mutex m;
your_message_type message;
然后有3种模式可以遵循。发送一条消息:
std::unique_lock l{m}; // C++17, don't need to pass type
set_message_data(message);
cv.notify_one();
发送大量消息:
std::unique_lock l{m};
set_lots_of_message_data(message);
cv.notify_all();
最后,等待并处理消息:
while(true) {
auto data = [&]()->std::optional<data_to_process>{
std::unique_lock l{m};
cv.wait( l, [&]{ return done() || there_is_a_message(message); } );
if (done()) return {};
return get_data_to_process(message);
}();
if (!data) break;
auto& data_to_process = *data;
// process the data
}
有一些灵活性。但是有很多规则要遵守。
在设置消息数据和通知之间,您必须锁定互斥锁。
你应该总是使用wait
的lambda版本——在没有lambda版本的情况下这样做意味着你做错了99次共 100 个。
消息数据应该足以确定是否应该完成任务,如果不是因为讨厌的线程和锁等等。
只使用 RAII 表示 lock/unlock 互斥。没有它的正确性几乎是不可能的。
处理东西时不要持有锁。持有锁足够长的时间让数据得到处理,然后放下锁。
您的代码违反了 2、3、4、5。我认为您没有搞砸 1。
但是,如果您在通知时锁定 cv,那么现代 cv 实现实际上非常高效。
我认为最明显的症状来自 3:您的工作线程始终持有锁,因此只有一个可以进步。其他人会导致您的代码出现其他问题。
现在,超越这个相对简单的模式是可能的。但是一旦你这样做了,你真的需要至少对 C++ 线程模型有一个基本的了解,你 不能 通过编写代码和 "see if it works" 来学习。您必须坐下来仔细阅读规范,了解条件变量在标准中的作用,了解互斥量的作用,编写一些代码,坐下来找出为什么它不起作用,找其他人写类似的代码和它有问题,找出其他人如何调试它并发现错误,回到你的代码并找到同样的错误,调整它,然后重复。
这就是我使用条件变量编写原语的原因,我不将条件变量与其他逻辑(比如维护线程池)混在一起。
写一个线程安全的队列。它所做的只是维护一个队列,并在有数据要读取时通知消费者。
最简单的有 3 个成员变量 -- 一个互斥量、一个条件变量和一个标准队列。
然后用关闭功能增强它 -- 现在 pop 必须 return 一个可选的或有一些其他的故障路径。
您的任务需要先对任务进行批处理,然后再将它们全部解雇。你确定你想要那个吗?为此,我要做的是在线程安全队列中添加一个 "push multiple tasks" 接口。然后在非线程安全队列中维护 "not ready" 个任务,只有在我们希望线程使用它们时才将它们全部推送。
然后"thread pool"消费线程安全队列。因为我们单独写了线程安全队列,我们有一半的移动部分,这意味着关系减少了 4 倍。
线程代码很难。尊重它。
我正在尝试编写一个非常简单的线程池来了解它们是如何在后台工作的。不过,我 运行 遇到了问题。当我使用 condition_variable 并调用 notify_all() 时,它只会唤醒我的池中的一个线程。
其他一切正常。我已经排队了 900 个作业,每个作业都有不错的有效负载。唤醒的一个线程消耗了所有这些工作,然后又回到睡眠状态。在下一个循环中,这一切再次发生。
问题是只有一个线程完成工作!我怎么搞砸了这个模板?
ThreadPool.h:
#pragma once
#include <mutex>
#include <stack>
#include <atomic>
#include <thread>
#include <condition_variable>
class ThreadPool
{
friend void __stdcall ThreadFunc();
public:
static ThreadPool & GetInstance()
{
static ThreadPool sInstance;
return (sInstance);
}
public:
void AddJob(Job * job);
void DoAllJobs();
private:
Job * GetJob();
private:
const static uint32_t ThreadCount = 8;
std::mutex JobMutex;
std::stack<Job *> Jobs;
volatile std::atomic<int> JobWorkCounter;
std::mutex SharedLock;
std::thread Threads[ThreadCount];
std::condition_variable Signal;
private:
ThreadPool();
~ThreadPool();
public:
ThreadPool(ThreadPool const &) = delete;
void operator = (ThreadPool const &) = delete;
};
ThreadPool.cpp:
#include "ThreadPool.h"
void __stdcall ThreadFunc()
{
std::unique_lock<std::mutex> lock(ThreadPool::GetInstance().SharedLock);
while (true)
{
ThreadPool::GetInstance().Signal.wait(lock);
while (Job * job = ThreadPool::GetInstance().GetJob())
{
job->_jobFn(job->_args);
ThreadPool::GetInstance().JobWorkCounter--;
}
}
}
ThreadPool::ThreadPool()
{
JobWorkCounter = 0;
for (uint32_t i = 0; i < ThreadCount; ++i)
Threads[i] = std::thread(ThreadFunc);
}
ThreadPool::~ThreadPool()
{
}
void ThreadPool::AddJob(Job * job)
{
JobWorkCounter++;
JobMutex.lock();
{
Jobs.push(job);
}
JobMutex.unlock();
}
void ThreadPool::DoAllJobs()
{
Signal.notify_all();
while (JobWorkCounter > 0)
{
Sleep(0);
}
}
Job * ThreadPool::GetJob()
{
Job * return_value = nullptr;
JobMutex.lock();
{
if (Jobs.empty() == false)
{
return_value = Jobs.top();
Jobs.pop();
}
}
JobMutex.unlock();
return (return_value);
}
感谢您的帮助!对不起大码post.
std::unique_lock<std::mutex> lock(ThreadPool::GetInstance().SharedLock);
每个线程首先获取此互斥体。
ThreadPool::GetInstance().Signal.wait(lock);
当主线程执行 notify_all()
时,所有线程都会收到来自条件变量的信号,但是您忘记了一个关键细节:在被条件变量通知后醒来后,互斥锁自动获取重新锁定。这就是 wait()
的工作原理。在你的 C++ 书籍或手册页中阅读它的文档;并且只有一个线程能够做到这一点。所有其他唤醒的线程也将尝试锁定互斥锁,但只有第一个赢得比赛并会这样做,所有其他线程将休眠并继续做梦。
收到通知后的线程将不会return从wait()
直到那个线程也成功重新锁定互斥体。
从wait()
到return必须发生两件事:线程从条件变量得到通知,和 线程重新锁定互斥体,成功。 wait()
解锁互斥量并等待条件变量,原子地,并在收到通知时重新锁定互斥量。
因此,幸运线程将锁定互斥量,并继续清空队列中的所有作业,然后返回循环顶部并再次 wait()
。这将解锁互斥量,现在其他一些幸运的线程(已收到通知但耐心等待它沐浴在阳光和荣耀中的机会)将能够锁定互斥量。以这种方式,所有其他线程将轮流,大象式,醒来,检查作业队列,在那里找不到任何东西,然后进入睡眠状态。
这就是您看到此行为的原因。
要使显示的代码线程安全,必须完成两件基本的事情。
1) 你不需要两个互斥量,一个就足够了。
2) 在 wait()
处理条件变量之前,检查作业队列中是否有内容。如果有东西,将其删除,并解锁互斥锁,然后执行此操作。
3) wait()
仅当作业队列为空时。在 wait()
returns 之后,重新锁定互斥量,然后检查作业队列是否仍然为空(此时你不能真正保证它不为空,只能保证它 可以非空)。
您只需要一个互斥锁来保护对非线程安全作业队列的访问,并等待条件变量。
除非您想设计一个新模式,否则使用条件变量的简单 "monkey-see monkey-do" 方法总是使用 3 种东西。
一个条件变量、一个互斥锁和一条消息。
std::condition_variable cv;
mutable std::mutex m;
your_message_type message;
然后有3种模式可以遵循。发送一条消息:
std::unique_lock l{m}; // C++17, don't need to pass type
set_message_data(message);
cv.notify_one();
发送大量消息:
std::unique_lock l{m};
set_lots_of_message_data(message);
cv.notify_all();
最后,等待并处理消息:
while(true) {
auto data = [&]()->std::optional<data_to_process>{
std::unique_lock l{m};
cv.wait( l, [&]{ return done() || there_is_a_message(message); } );
if (done()) return {};
return get_data_to_process(message);
}();
if (!data) break;
auto& data_to_process = *data;
// process the data
}
有一些灵活性。但是有很多规则要遵守。
在设置消息数据和通知之间,您必须锁定互斥锁。
你应该总是使用
wait
的lambda版本——在没有lambda版本的情况下这样做意味着你做错了99次共 100 个。消息数据应该足以确定是否应该完成任务,如果不是因为讨厌的线程和锁等等。
只使用 RAII 表示 lock/unlock 互斥。没有它的正确性几乎是不可能的。
处理东西时不要持有锁。持有锁足够长的时间让数据得到处理,然后放下锁。
您的代码违反了 2、3、4、5。我认为您没有搞砸 1。
但是,如果您在通知时锁定 cv,那么现代 cv 实现实际上非常高效。
我认为最明显的症状来自 3:您的工作线程始终持有锁,因此只有一个可以进步。其他人会导致您的代码出现其他问题。
现在,超越这个相对简单的模式是可能的。但是一旦你这样做了,你真的需要至少对 C++ 线程模型有一个基本的了解,你 不能 通过编写代码和 "see if it works" 来学习。您必须坐下来仔细阅读规范,了解条件变量在标准中的作用,了解互斥量的作用,编写一些代码,坐下来找出为什么它不起作用,找其他人写类似的代码和它有问题,找出其他人如何调试它并发现错误,回到你的代码并找到同样的错误,调整它,然后重复。
这就是我使用条件变量编写原语的原因,我不将条件变量与其他逻辑(比如维护线程池)混在一起。
写一个线程安全的队列。它所做的只是维护一个队列,并在有数据要读取时通知消费者。
最简单的有 3 个成员变量 -- 一个互斥量、一个条件变量和一个标准队列。
然后用关闭功能增强它 -- 现在 pop 必须 return 一个可选的或有一些其他的故障路径。
您的任务需要先对任务进行批处理,然后再将它们全部解雇。你确定你想要那个吗?为此,我要做的是在线程安全队列中添加一个 "push multiple tasks" 接口。然后在非线程安全队列中维护 "not ready" 个任务,只有在我们希望线程使用它们时才将它们全部推送。
然后"thread pool"消费线程安全队列。因为我们单独写了线程安全队列,我们有一半的移动部分,这意味着关系减少了 4 倍。
线程代码很难。尊重它。