std::queue producer/consumer 的最小互斥量
Minimal mutexes for std::queue producer/consumer
我有两个线程在 std::queue 的生产者端和消费者端工作。队列并不经常满,所以我想避免消费者抓住保护队列变异的互斥体。
是否可以在互斥量之外调用 empty()
然后仅在队列中有内容时才获取互斥量?
例如:
struct MyData{
int a;
int b;
};
class SpeedyAccess{
public:
void AddDataFromThread1(MyData data){
const std::lock_guard<std::mutex> queueMutexLock(queueAccess);
workQueue.push(data);
}
void CheckFromThread2(){
if(!workQueue.empty()) // Un-protected access...is this dangerous?
{
queueAccess.lock();
MyData data = workQueue.front();
workQueue.pop();
queueAccess.unlock();
ExpensiveComputation(data);
}
}
private:
void ExpensiveComputation(MyData& data);
std::queue<MyData> workQueue;
std::mutex queueAccess;
}
线程 2 执行检查并且不是特别时间关键,但会被调用很多(500/秒?)。线程 1 的时间非常关键,很多东西需要 运行 在那里,但调用频率不高(最多 20 次/秒)。
如果我在 empty()
周围添加一个互斥保护,如果线程 2 到来时队列为空,它不会长时间持有互斥,因此可能不会大受欢迎。然而,由于它被如此频繁地调用,它可能偶尔会在同一时间发生某些东西试图放在后面......这会导致线程 1 中的大量等待吗?
正如上面评论中所写,您应该只在锁定的情况下调用 empty()
。
但我相信有更好的方法。
您可以使用 std::condition_variable
together with a std::mutex
来实现对队列访问的同步,而不需要比您必须的更多地锁定互斥量。
但是 - 使用 std::condition_variable
时,您必须意识到它会受到 虚假唤醒 的影响。您可以在这里阅读:Spurious wakeup - Wikipedia.
您可以在此处查看一些代码示例:
Condition variable examples.
使用 std::condition_variable
的正确方法如下所示(带有一些注释)。
这只是展示原理的最小示例。
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <iostream>
using MyData = int;
std::mutex mtx;
std::condition_variable cond_var;
std::queue<MyData> q;
void producer()
{
MyData produced_val = 0;
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // simulate some pause between productions
++produced_val;
std::cout << "produced: " << produced_val << std::endl;
{
// Access the Q under the lock:
std::unique_lock<std::mutex> lck(mtx);
q.push(produced_val);
cond_var.notify_all(); // It's not a must to nofity under the lock but it might be more efficient (see @DavidSchwartz's comment below).
}
}
}
void consumer()
{
while (true)
{
MyData consumed_val;
{
// Access the Q under the lock:
std::unique_lock<std::mutex> lck(mtx);
// NOTE: The following call will lock the mutex only when the the condition_varible will cause wakeup
// (due to `notify` or spurious wakeup).
// Then it will check if the Q is empty.
// If empty it will release the lock and continue to wait.
// If not empty, the lock will be kept until out of scope.
// See the documentation for std::condition_variable.
cond_var.wait(lck, []() { return !q.empty(); }); // will loop internally to handle spurious wakeups
consumed_val = q.front();
q.pop();
}
std::cout << "consumed: " << consumed_val << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // simulate some calculation
}
}
int main()
{
std::thread p(producer);
std::thread c(consumer);
while(true) {}
p.join(); c.join(); // will never happen in our case but to remind us what is needed.
return 0;
}
一些注意事项:
- 在您的实际代码中,none 个线程应该永远 运行。你应该有一些机制来通知他们优雅地退出。
- 全局变量(
mtx
、q
等)最好是某些上下文class的成员,或者传递给producer()
和consumer()
作为参数。
- 为简单起见,此示例假设生产者的生产率始终低于消费者的生产率。在您的真实代码中,您可以使其更通用,方法是让消费者在每次发出
condition_variable
信号时提取 Q 中的所有元素。
- 您可以“玩”生产者和消费者的
sleep_for
次来测试各种时序情况。
我有两个线程在 std::queue 的生产者端和消费者端工作。队列并不经常满,所以我想避免消费者抓住保护队列变异的互斥体。
是否可以在互斥量之外调用 empty()
然后仅在队列中有内容时才获取互斥量?
例如:
struct MyData{
int a;
int b;
};
class SpeedyAccess{
public:
void AddDataFromThread1(MyData data){
const std::lock_guard<std::mutex> queueMutexLock(queueAccess);
workQueue.push(data);
}
void CheckFromThread2(){
if(!workQueue.empty()) // Un-protected access...is this dangerous?
{
queueAccess.lock();
MyData data = workQueue.front();
workQueue.pop();
queueAccess.unlock();
ExpensiveComputation(data);
}
}
private:
void ExpensiveComputation(MyData& data);
std::queue<MyData> workQueue;
std::mutex queueAccess;
}
线程 2 执行检查并且不是特别时间关键,但会被调用很多(500/秒?)。线程 1 的时间非常关键,很多东西需要 运行 在那里,但调用频率不高(最多 20 次/秒)。
如果我在 empty()
周围添加一个互斥保护,如果线程 2 到来时队列为空,它不会长时间持有互斥,因此可能不会大受欢迎。然而,由于它被如此频繁地调用,它可能偶尔会在同一时间发生某些东西试图放在后面......这会导致线程 1 中的大量等待吗?
正如上面评论中所写,您应该只在锁定的情况下调用 empty()
。
但我相信有更好的方法。
您可以使用 std::condition_variable
together with a std::mutex
来实现对队列访问的同步,而不需要比您必须的更多地锁定互斥量。
但是 - 使用 std::condition_variable
时,您必须意识到它会受到 虚假唤醒 的影响。您可以在这里阅读:Spurious wakeup - Wikipedia.
您可以在此处查看一些代码示例:
Condition variable examples.
使用 std::condition_variable
的正确方法如下所示(带有一些注释)。
这只是展示原理的最小示例。
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <iostream>
using MyData = int;
std::mutex mtx;
std::condition_variable cond_var;
std::queue<MyData> q;
void producer()
{
MyData produced_val = 0;
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // simulate some pause between productions
++produced_val;
std::cout << "produced: " << produced_val << std::endl;
{
// Access the Q under the lock:
std::unique_lock<std::mutex> lck(mtx);
q.push(produced_val);
cond_var.notify_all(); // It's not a must to nofity under the lock but it might be more efficient (see @DavidSchwartz's comment below).
}
}
}
void consumer()
{
while (true)
{
MyData consumed_val;
{
// Access the Q under the lock:
std::unique_lock<std::mutex> lck(mtx);
// NOTE: The following call will lock the mutex only when the the condition_varible will cause wakeup
// (due to `notify` or spurious wakeup).
// Then it will check if the Q is empty.
// If empty it will release the lock and continue to wait.
// If not empty, the lock will be kept until out of scope.
// See the documentation for std::condition_variable.
cond_var.wait(lck, []() { return !q.empty(); }); // will loop internally to handle spurious wakeups
consumed_val = q.front();
q.pop();
}
std::cout << "consumed: " << consumed_val << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // simulate some calculation
}
}
int main()
{
std::thread p(producer);
std::thread c(consumer);
while(true) {}
p.join(); c.join(); // will never happen in our case but to remind us what is needed.
return 0;
}
一些注意事项:
- 在您的实际代码中,none 个线程应该永远 运行。你应该有一些机制来通知他们优雅地退出。
- 全局变量(
mtx
、q
等)最好是某些上下文class的成员,或者传递给producer()
和consumer()
作为参数。 - 为简单起见,此示例假设生产者的生产率始终低于消费者的生产率。在您的真实代码中,您可以使其更通用,方法是让消费者在每次发出
condition_variable
信号时提取 Q 中的所有元素。 - 您可以“玩”生产者和消费者的
sleep_for
次来测试各种时序情况。