在无限循环或更新函数中锁定互斥锁有多糟糕
How bad it is to lock a mutex in an infinite loop or an update function
std::queue<double> some_q;
std::mutex mu_q;
/* an update function may be an event observer */
void UpdateFunc()
{
/* some other processing */
std::lock_guard lock{ mu_q };
while (!some_q.empty())
{
const auto& val = some_q.front();
/* update different states according to val */
some_q.pop();
}
/* some other processing */
}
/* some other thread might add some values after processing some other inputs */
void AddVal(...)
{
std::lock_guard lock{ mu_q };
some_q.push(...);
}
对于这种情况,可以这样处理队列吗?
或者如果我尝试使用像 boost 一样的无锁队列会更好吗?
您可以像当前假设在所有线程中正确使用锁一样使用它。但是,您可能 运行 对如何调用 updateFunc() 感到沮丧。
- 您要使用回调吗?
- 您要使用 ISR 吗?
- 你要投票吗?
如果您使用第 3 方库,它通常会使线程同步和队列变得微不足道
例如,如果您使用 CMSIS RTOS(v2)。让多个线程在彼此之间传递信息是一个相当直接的过程。您可以有多个生产者和一个消费者。
单个消费者可以在一个永远的循环中等待,在该循环中等待接收消息,然后再执行其工作
when timeout is set to osWaitForever the function will wait for an
infinite time until the message is retrieved (i.e. wait semantics).
// Two producers
osMessageQueuePut(X,Y,Z,timeout=0)
osMessageQueuePut(X,Y,Z,timeout=0)
// One consumer which will run only once something enters the queue
osMessageQueueGet(X,Y,Z,osWaitForever)
tldr;您可以安全地继续,但使用库可能会使您的同步问题更容易。
How bad it is to lock a mutex in an infinite loop or an update function
这很糟糕。无限循环实际上使您的程序具有 未定义的行为,除非它执行以下操作之一:
- 终止
- 调用库 I/O 函数
- 通过 volatile glvalue 执行访问
- 执行同步操作或原子操作
在进入循环之前获取互斥锁并持有它不算执行同步操作(在循环中)。此外,当持有互斥量时,没有人可以向队列中添加信息,因此在处理您提取的信息时,所有想要添加到队列中的线程都必须等待 - 并且没有其他想要分担负载的工作线程可以从排队。通常最好从队列中提取一项任务,释放锁,然后使用您得到的任务。
常用的方法是使用一个condition_variable
让其他线程获取锁,然后通知其他等待相同condition_variable
的线程。 CPU 在等待时将非常接近空闲,并在需要时唤醒以完成工作。
以您的程序为基础,它可能看起来像这样:
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
std::queue<double> some_q;
std::mutex mu_q;
std::condition_variable cv_q; // the condition variable
bool stop_q = false; // something to signal the worker thread to quit
/* an update function may be an event observer */
void UpdateFunc() {
while(true) {
double val;
{
std::unique_lock lock{mu_q};
// cv_q.wait lets others acquire the lock to work with the queue
// while it waits to be notified.
while (not stop_q && some_q.empty()) cv_q.wait(lock);
if(stop_q) break; // time to quit
val = std::move(some_q.front());
some_q.pop();
} // lock released so others can use the queue
// do time consuming work with "val" here
std::cout << "got " << val << '\n';
}
}
/* some other thread might add some values after processing some other inputs */
void AddVal(double val) {
std::lock_guard lock{mu_q};
some_q.push(val);
cv_q.notify_one(); // notify someone that there's a new value to work with
}
void StopQ() { // a function to set the queue in shutdown mode
std::lock_guard lock{mu_q};
stop_q = true;
cv_q.notify_all(); // notify all that it's time to stop
}
int main() {
auto th = std::thread(UpdateFunc);
// simulate some events coming with some time apart
std::this_thread::sleep_for(std::chrono::seconds(1));
AddVal(1.2);
std::this_thread::sleep_for(std::chrono::seconds(1));
AddVal(3.4);
std::this_thread::sleep_for(std::chrono::seconds(1));
AddVal(5.6);
std::this_thread::sleep_for(std::chrono::seconds(1));
StopQ();
th.join();
}
如果您真的想处理当前队列中的所有内容,请先提取所有内容然后释放锁,然后处理您提取的内容。只需换入另一个 std::queue
,即可快速从队列中提取所有内容。示例:
#include <atomic>
std::atomic<bool> stop_q{}; // needs to be atomic in this version
void UpdateFunc() {
while(not stop_q) {
std::queue<double> work; // this will be used to swap with some_q
{
std::unique_lock lock{mu_q};
// cv_q.wait lets others acquire the lock to work with the queue
// while it waits to be notified.
while (not stop_q && some_q.empty()) cv_q.wait(lock);
std::swap(work, some_q); // extract everything from the queue at once
} // lock released so others can use the queue
// do time consuming work here
while(not stop_q && not work.empty()) {
auto val = std::move(work.front());
work.pop();
std::cout << "got " << val << '\n';
}
}
}
std::queue<double> some_q;
std::mutex mu_q;
/* an update function may be an event observer */
void UpdateFunc()
{
/* some other processing */
std::lock_guard lock{ mu_q };
while (!some_q.empty())
{
const auto& val = some_q.front();
/* update different states according to val */
some_q.pop();
}
/* some other processing */
}
/* some other thread might add some values after processing some other inputs */
void AddVal(...)
{
std::lock_guard lock{ mu_q };
some_q.push(...);
}
对于这种情况,可以这样处理队列吗? 或者如果我尝试使用像 boost 一样的无锁队列会更好吗?
您可以像当前假设在所有线程中正确使用锁一样使用它。但是,您可能 运行 对如何调用 updateFunc() 感到沮丧。
- 您要使用回调吗?
- 您要使用 ISR 吗?
- 你要投票吗?
如果您使用第 3 方库,它通常会使线程同步和队列变得微不足道
例如,如果您使用 CMSIS RTOS(v2)。让多个线程在彼此之间传递信息是一个相当直接的过程。您可以有多个生产者和一个消费者。
单个消费者可以在一个永远的循环中等待,在该循环中等待接收消息,然后再执行其工作
when timeout is set to osWaitForever the function will wait for an infinite time until the message is retrieved (i.e. wait semantics).
// Two producers
osMessageQueuePut(X,Y,Z,timeout=0)
osMessageQueuePut(X,Y,Z,timeout=0)
// One consumer which will run only once something enters the queue
osMessageQueueGet(X,Y,Z,osWaitForever)
tldr;您可以安全地继续,但使用库可能会使您的同步问题更容易。
How bad it is to lock a mutex in an infinite loop or an update function
这很糟糕。无限循环实际上使您的程序具有 未定义的行为,除非它执行以下操作之一:
- 终止
- 调用库 I/O 函数
- 通过 volatile glvalue 执行访问
- 执行同步操作或原子操作
在进入循环之前获取互斥锁并持有它不算执行同步操作(在循环中)。此外,当持有互斥量时,没有人可以向队列中添加信息,因此在处理您提取的信息时,所有想要添加到队列中的线程都必须等待 - 并且没有其他想要分担负载的工作线程可以从排队。通常最好从队列中提取一项任务,释放锁,然后使用您得到的任务。
常用的方法是使用一个condition_variable
让其他线程获取锁,然后通知其他等待相同condition_variable
的线程。 CPU 在等待时将非常接近空闲,并在需要时唤醒以完成工作。
以您的程序为基础,它可能看起来像这样:
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
std::queue<double> some_q;
std::mutex mu_q;
std::condition_variable cv_q; // the condition variable
bool stop_q = false; // something to signal the worker thread to quit
/* an update function may be an event observer */
void UpdateFunc() {
while(true) {
double val;
{
std::unique_lock lock{mu_q};
// cv_q.wait lets others acquire the lock to work with the queue
// while it waits to be notified.
while (not stop_q && some_q.empty()) cv_q.wait(lock);
if(stop_q) break; // time to quit
val = std::move(some_q.front());
some_q.pop();
} // lock released so others can use the queue
// do time consuming work with "val" here
std::cout << "got " << val << '\n';
}
}
/* some other thread might add some values after processing some other inputs */
void AddVal(double val) {
std::lock_guard lock{mu_q};
some_q.push(val);
cv_q.notify_one(); // notify someone that there's a new value to work with
}
void StopQ() { // a function to set the queue in shutdown mode
std::lock_guard lock{mu_q};
stop_q = true;
cv_q.notify_all(); // notify all that it's time to stop
}
int main() {
auto th = std::thread(UpdateFunc);
// simulate some events coming with some time apart
std::this_thread::sleep_for(std::chrono::seconds(1));
AddVal(1.2);
std::this_thread::sleep_for(std::chrono::seconds(1));
AddVal(3.4);
std::this_thread::sleep_for(std::chrono::seconds(1));
AddVal(5.6);
std::this_thread::sleep_for(std::chrono::seconds(1));
StopQ();
th.join();
}
如果您真的想处理当前队列中的所有内容,请先提取所有内容然后释放锁,然后处理您提取的内容。只需换入另一个 std::queue
,即可快速从队列中提取所有内容。示例:
#include <atomic>
std::atomic<bool> stop_q{}; // needs to be atomic in this version
void UpdateFunc() {
while(not stop_q) {
std::queue<double> work; // this will be used to swap with some_q
{
std::unique_lock lock{mu_q};
// cv_q.wait lets others acquire the lock to work with the queue
// while it waits to be notified.
while (not stop_q && some_q.empty()) cv_q.wait(lock);
std::swap(work, some_q); // extract everything from the queue at once
} // lock released so others can use the queue
// do time consuming work here
while(not stop_q && not work.empty()) {
auto val = std::move(work.front());
work.pop();
std::cout << "got " << val << '\n';
}
}
}