使用条件变量一次触发一个线程

Using condition variable to trigger threads one at a time

鉴于此示例代码取自 cplusplus.com

并进行了修改
#include "stdafx.h"
// condition_variable example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id(int id) {
    std::unique_lock<std::mutex> lck(mtx);
    while (!ready)
    {
        std::cout << "waiting for unlock";
        cv.wait(lck);
    }

    std::cout << "thread " << id << '\n';
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    cv.notify_one();
}

void go() {
    std::unique_lock<std::mutex> lck(mtx);
    ready = true;
    cv.notify_one();
}

int main()
{
    std::thread threads[10];
    // spawn 10 threads:
    for (int i = 0; i<10; ++i)
        threads[i] = std::thread(print_id, i);

    std::cout << "10 threads ready to race...\n";
    go();                       // go!

    for (auto& th : threads) th.join();

    return 0;
}

这是否正确地阻塞了每个线程,直到一个线程被 notify_one() 调用唤醒?或者这实际上是在解锁所有线程?

我试图同时启动所有线程,然后阻塞直到前一个线程完成。顺序并不重要,但我不能让他们同时执行所有代码。

cppreference 后:

void notify_one();

If any threads are waiting on *this, calling notify_one unblocks one of the waiting threads.

所以它只解锁了一个线程,你不能说是哪一个。为了解锁所有线程,您需要使用 void notify_all();

如果您尝试一次 运行 所有线程,您应该在 go() 中使用 notify_all(),然后在 notify_one() 中使用顺序 运行 =15=]

请记住,即使使用 notify_all() 所有线程都会尝试锁定 lck,但只有一个会成功。

从没有两个工作线程可以执行此代码的意义上说,您的代码是有效的:

  std::cout << "thread " << id << '\n';
  std::this_thread::sleep_for(std::chrono::milliseconds(2000));
  cv.notify_one();

并发,但这并不完全是因为您使用 std::condition_variable 一次解除对一个工作线程的阻塞。事实上,您并没有真正改变原始 cplusplus.com 示例中的行为。在这两个程序中,代码的关键部分都由一个锁定的互斥锁保护,这保证了任何时候只有一个线程可以持有锁。

您使用 std::condition_variable 到 "serialize" 线程的执行本身并不能阻止并发执行,因为调用 notify_one() 不能保证只有一个线程从 wait()。原因是允许线程在没有任何通知的情况下从 wait() 伪造 return:

The function will unblock when signaled by a call to notify_one() or a call to notify_all(), or spuriously.

从某种意义上说,您正在被互斥锁拯救。如果你这样写你的辅助函数:

void print_id(int id) {
    {
        std::unique_lock<std::mutex> lck(mtx);
        while (!ready)
        {
            std::cout << "waiting for unlock";
            cv.wait(lck);
        }
    }

    std::cout << "thread " << id << '\n';
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    cv.notify_one();
}

此代码会在线程解除阻塞后立即解锁互斥量。这并不安全,因为多个线程可以在没有 锁定互斥锁保护的情况下到达您的临界区

所以我认为您的程序会做您想做的事情 - 在没有并发的情况下在多个线程中运行一段代码 - 但可能不是出于您期望的原因。


按条件阻塞的惯用方法是定义一个谓词测试,当线程准备好继续时通过,否则失败,并在循环中检查它:

std::unique_lock<std::mutex> lock(mutex);
while (!predicate)
    condition.wait(lock);

这个习惯用法可以正确处理虚假唤醒,因为谓词测试将失败并且线程将再次调用 wait()

尽管您的程序看起来与此非常相似,但它并没有完全做到这一点,因为您的谓词允许所有线程继续进行,而不仅仅是一个线程,这不是您所说的您想要的。由于锁定的互斥体,它无论如何都能工作。

您可以通过更改谓词测试使其仅针对下一个线程通过,并使用 notify_all():

来按顺序一次解锁线程
#include <atomic>
...
std::atomic<int> ready(-1);

void print_id(int id) {
   {
      std::unique_lock<std::mutex> lck(mtx);
      while (ready != id)
         cv.wait(lck);
   }

   std::cout << "thread " << id << '\n';
   std::this_thread::sleep_for(std::chrono::milliseconds(2000));
   ++ready;
   cv.notify_all();
}

void go() {
   ready = 0;
   cv.notify_all();
}

int main()
{
   std::thread threads[10];
   // spawn 10 threads:
   for (int i = 0; i<10; ++i)
      threads[i] = std::thread(print_id, i);

   std::cout << "10 threads ready to race...\n";
   go();                       // go!

   for (auto& th : threads) th.join();

   return 0;
}

请注意这个谓词测试 ready != id 如何仅在线程应该继续时通过。我还为 ready 使用了 std::atomic<int> 以便 notification does not require locking the mutex.

修改后的谓词代码是正确的,但是一个缺点是我们必须将notify_one()更改为notify_all()以确保唤醒下一个线程。这会唤醒所有线程,只是让除其中一个以外的所有线程重新等待,这会消耗一点性能。优化它的一种方法是创建 N condition_variable 个实例(例如在数组中)并让每个线程等待自己的 condition_variable 个实例。