在无限循环或更新函数中锁定互斥锁有多糟糕

How bad it is to lock a mutex in an infinite loop or an update function

std::queue<double> some_q;
std::mutex mu_q;

/* an update function may be an event observer */
void UpdateFunc()
{
    /* some other processing */
    std::lock_guard lock{ mu_q };
    while (!some_q.empty())
    {
        const auto& val = some_q.front();
        /* update different states according to val */
        some_q.pop();
    }
    /* some other processing */
}

/* some other thread might add some values after processing some other inputs */
void AddVal(...)
{
    std::lock_guard lock{ mu_q };
    some_q.push(...);
}

对于这种情况,可以这样处理队列吗? 或者如果我尝试使用像 boost 一样的无锁队列会更好吗?

您可以像当前假设在所有线程中正确使用锁一样使用它。但是,您可能 运行 对如何调用 updateFunc() 感到沮丧。

  • 您要使用回调吗?
  • 您要使用 ISR 吗?
  • 你要投票吗?

如果您使用第 3 方库,它通常会使线程同步和队列变得微不足道

例如,如果您使用 CMSIS RTOS(v2)。让多个线程在彼此之间传递信息是一个相当直接的过程。您可以有多个生产者和一个消费者。

单个消费者可以在一个永远的循环中等待,在该循环中等待接收消息,然后再执行其工作

when timeout is set to osWaitForever the function will wait for an infinite time until the message is retrieved (i.e. wait semantics).

// Two producers
osMessageQueuePut(X,Y,Z,timeout=0)
osMessageQueuePut(X,Y,Z,timeout=0)
    
// One consumer which will run only once something enters the queue
osMessageQueueGet(X,Y,Z,osWaitForever)

tldr;您可以安全地继续,但使用库可能会使您的同步问题更容易。

How bad it is to lock a mutex in an infinite loop or an update function

这很糟糕。无限循环实际上使您的程序具有 未定义的行为,除非它执行以下操作之一:

  • 终止
  • 调用库 I/O 函数
  • 通过 volatile glvalue 执行访问
  • 执行同步操作或原子操作

在进入循环之前获取互斥锁并持有它不算执行同步操作(在循环中)。此外,当持有互斥量时,没有人可以向队列中添加信息,因此在处理您提取的信息时,所有想要添加到队列中的线程都必须等待 - 并且没有其他想要分担负载的工作线程可以从排队。通常最好从队列中提取一项任务,释放锁,然后使用您得到的任务。

常用的方法是使用一个condition_variable让其他线程获取锁,然后通知其他等待相同condition_variable的线程。 CPU 在等待时将非常接近空闲,并在需要时唤醒以完成工作。

以您的程序为基础,它可能看起来像这样:

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

std::queue<double> some_q;
std::mutex mu_q;
std::condition_variable cv_q; // the condition variable
bool stop_q = false;          // something to signal the worker thread to quit

/* an update function may be an event observer */
void UpdateFunc() {
    while(true) {
        double val;
        {
            std::unique_lock lock{mu_q};

            // cv_q.wait lets others acquire the lock to work with the queue
            // while it waits to be notified.
            while (not stop_q && some_q.empty()) cv_q.wait(lock);

            if(stop_q) break; // time to quit

            val = std::move(some_q.front());
            some_q.pop();
        } // lock released so others can use the queue

        // do time consuming work with "val" here
        std::cout << "got " << val << '\n';
    }
}

/* some other thread might add some values after processing some other inputs */
void AddVal(double val) {
    std::lock_guard lock{mu_q};
    some_q.push(val);
    cv_q.notify_one(); // notify someone that there's a new value to work with
}

void StopQ() { // a function to set the queue in shutdown mode
    std::lock_guard lock{mu_q};
    stop_q = true;
    cv_q.notify_all(); // notify all that it's time to stop
}

int main() {
    auto th = std::thread(UpdateFunc);
    
    // simulate some events coming with some time apart
    std::this_thread::sleep_for(std::chrono::seconds(1));
    AddVal(1.2);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    AddVal(3.4);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    AddVal(5.6);
    std::this_thread::sleep_for(std::chrono::seconds(1));

    StopQ();    
    th.join();
}

如果您真的想处理当前队列中的所有内容,请先提取所有内容然后释放锁,然后处理您提取的内容。只需换入另一个 std::queue,即可快速从队列中提取所有内容。示例:

#include <atomic>

std::atomic<bool> stop_q{}; // needs to be atomic in this version

void UpdateFunc() {
    while(not stop_q) {
        std::queue<double> work; // this will be used to swap with some_q
        {
            std::unique_lock lock{mu_q};

            // cv_q.wait lets others acquire the lock to work with the queue
            // while it waits to be notified.
            while (not stop_q && some_q.empty()) cv_q.wait(lock);

            std::swap(work, some_q); // extract everything from the queue at once
        } // lock released so others can use the queue

        // do time consuming work here
        while(not stop_q && not work.empty()) {
            auto val = std::move(work.front());
            work.pop();

            std::cout << "got " << val << '\n';
        }
    }
}