返回布尔标志的 Lambda 表达式不停止条件变量 wait() 函数

Lambda Expression returning a bool flag not stopping condition variables wait() function

我有一个 WorkDispatcher class 将 Worker class 对象作为属性保存并在新线程中启动它们的功能。 这是一个例子:

WorkDispatcher.h:

class WorkDispatcher
{

private:
    std::thread m_reconstructionThread;
    std::shared_ptr <Reconstruction::RGBDImageModel> m_currentRGBD;

public:
    WorkDispatcher();

    std::mutex m_rgbdMutex, m_commandMutex;
    std::deque<Network::NetworkCommandModel> m_CommandQueue;
    std::condition_variable m_RgbConditional, m_CommandConditional;

    Reconstruction::SceneReconstructor m_Reconstructor;

    void Initialize();
    void Work();
    void Stop();

};

WorkDispatcher.cpp:

void WorkDispatcher::Work()
{    
    m_reconstructionThread = std::thread(
        &Reconstruction::SceneReconstructor::Reconstruct, 
        std::ref(m_Reconstructor),
        std::ref(m_currentRGBD),
        std::ref(m_CommandQueue),
        std::ref(m_rgbdMutex), 
        std::ref(m_RgbConditional), 
        std::ref(m_commandMutex), 
        std::ref(m_CommandConditional)
    );
}

这些函数保持无限循环,我使用条件变量等待直到工作可用。例如我的重建函数:

void SceneReconstructor::Reconstruct(
    std::shared_ptr<RGBDImageModel>& currentImage,
    std::deque<Network::NetworkCommandModel> commandQueue,
    std::mutex& rgbdMutex,
    std::condition_variable& rgbdCond,
    std::mutex& commandMutex,
    std::condition_variable& commandConditional)
{
    while (true)
    {
        std::unique_lock<std::mutex> rgbdLocker(rgbdMutex);
        rgbdCond.wait(rgbdLocker, [this] {return m_QuitReconstructionFlag; });
        
        // Quit flag to break out of loop
        if (m_QuitReconstructionFlag)
            break;

        // do stuff here

    }
}

到目前为止一切顺利,但是如果我想退出应用程序,我需要退出我的所有工作线程。如上所示,为此这些 classes 有一个退出标志,我使用如下:

void WorkDispatcher::Stop()
{
    // for all worker threads do this
    m_Reconstructor.m_QuitReconstructionFlag = true;
    if (m_reconstructionThread.joinable())
        m_reconstructionThread.join();
}

理论上这应该在工作线程循环中停止 wait() 函数,然后用 m_QuitReconstructionFlag 跳出循环,但这不起作用。

以下是有效的:

  1. 从等待函数中删除 lambda
  2. 在设置后对条件变量调用notify_all() 退出标志为真;

这对我来说很好,但问题是,为什么 lambda 不起作用?

why doesn't the lambda work?

它本身工作得很好。

但是,C++ 需要遵循复杂的规则才能正确地同步 多个执行线程。仅仅因为一个执行线程将特定变量设置为一个值并不能以任何方式保证其他执行线程将看到该变量的新值。同步规则管理该行为。

所以,这个 lambda 可以正常工作。在它自己的执行线程中。但是,如果您希望此 lambda 观察其他执行线程对值所做的更改,则必须正确 synchronized.

此外,如果您查看 wait() 的文档,您应该会找到详细的解释,说明如果条件函数的计算结果为 false ,它会不会被再次调用,直到条件变量被通知.

What does work is ... call notify_all()

当然可以。由于wait()需要通知条件变量,在它再次检查等待条件之前,这就是你必须做的!

最后,通知条件变量将正常工作在大多数情况下,但是,正如我提到的,同步规则(其中互斥锁和条件变量起着重要的作用)有一些边缘情况,在这些情况下,这本身是行不通的。您必须严格遵循以下事件序列 才能在所有边缘情况下进行正确的同步:

  1. 锁定另一个执行线程在等待其条件变量之前锁定的 same 互斥锁。
  2. 通知条件变量。
  3. 解锁互斥量

这里 lambda 返回 true 不是停止等待的条件,而是 lambda 是为了解释虚假唤醒。 conditional_variablenotifynotify_all 函数用于使等待退出。

无需删除 lambda,您只需将停止函数更改为

void WorkDispatcher::Stop()
{
    // for all worker threads do this
    m_Reconstructor.m_QuitReconstructionFlag = true;
    m_RgbConditional.notify_all() 
    if (m_reconstructionThread.joinable())
        m_reconstructionThread.join();
}

here 你可以看到带有谓词传递给它的 wait (wait(predicate)) 等同于

if(!predicate())
    wait()

因此当你调用Stop()时,它会将谓词设置为真,所以当线程被唤醒时wait() returns,并检查谓词,如果它是真的, wait(predicate) returns.

在前面的例子中,predicate 被设置为 true 但函数没有被唤醒

必须使用条件变量 wait 使用的相同互斥体保护 m_QuitReconstructionFlag

否则不行

如果您不想非常详细地了解 C++ 内存模型,则在使用条件变量时,您应该遵循“最佳实践”以防止出现问题。

条件变量的最佳做法是将 3 个东西捆绑在一起。

  1. 条件变量。
  2. 互斥体(通常是可变的)。
  3. 一个州。

然后将所有 3 个都捆绑在某种抽象之后。

改变状态:

  1. 锁定互斥量
  2. 改变状态
  3. 适当通知条件变量
  4. 解锁互斥体

不要认为状态是原子的就意味着你不必锁定互斥量。

当你想在条件变量上等待时:

  1. 锁定互斥量
  2. 等等,正在传递检查状态的 lambda。
  3. 退出等待时,您可以自由更新状态。
  4. 解锁互斥体

一般来说,在上述所有情况下,都使用unique_lock来锁定互斥量,并依靠RAII来解锁它。

具体的状态以及何时通知,由您决定。

不要直接在此包和 api 之外与该互斥锁交互,不要直接在此包和 api 之外与状态交互,并且不要与条件交互此包之外的变量和 api.

如果需要,将数据复制或移出 api,不要将指针、引用或迭代器保存到其中。

你的状态可以有不止一个变量。比如,你可以有一个队列和一个用于关闭的布尔值。

例如,假设您有一个队列。

template<class T>
struct cv_queue {
  std::optional<T> pop() {
    auto l = lock();
    cv.wait( l, [&]{ return aborted || !queue.empty(); } );
    if (aborted) return {};
    auto retval = std::move(queue.front());
    queue.pop_front();
    return retval;
  }
  void push( T in ) {
    auto l = lock();
    queue.push_back( std::move(in) );
    cv.notify_one();
  }
  void abort_everything() {
    auto l = lock();
    abort = true;
    cv.notify_all();
  }
  bool empty() const {
    auto l = lock();
    return queue.empty();
  }
private:
  std::condition_variable cv;
  mutable std::mutex m;
  std::deque<T> queue;
  bool aborted=false;

  auto lock() const { return std::unique_lock( m ); }
};

添加 pop_wait_fortry_pop 并不难。

一个简单的 3 部分包装器,围绕数据或任何不难编写的内容。根据我的经验,让它更通用并不会增加它的可理解性。