使用条件变量一次触发一个线程
Using condition variable to trigger threads one at a time
鉴于此示例代码取自 cplusplus.com
并进行了修改
#include "stdafx.h"
// condition_variable example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void print_id(int id) {
std::unique_lock<std::mutex> lck(mtx);
while (!ready)
{
std::cout << "waiting for unlock";
cv.wait(lck);
}
std::cout << "thread " << id << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
cv.notify_one();
}
void go() {
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_one();
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i<10; ++i)
threads[i] = std::thread(print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto& th : threads) th.join();
return 0;
}
这是否正确地阻塞了每个线程,直到一个线程被 notify_one() 调用唤醒?或者这实际上是在解锁所有线程?
我试图同时启动所有线程,然后阻塞直到前一个线程完成。顺序并不重要,但我不能让他们同时执行所有代码。
cppreference 后:
void notify_one();
If any threads are waiting on *this, calling notify_one unblocks one of the waiting threads.
所以它只解锁了一个线程,你不能说是哪一个。为了解锁所有线程,您需要使用 void notify_all();
如果您尝试一次 运行 所有线程,您应该在 go()
中使用 notify_all()
,然后在 notify_one()
中使用顺序 运行 =15=]
请记住,即使使用 notify_all()
所有线程都会尝试锁定 lck
,但只有一个会成功。
从没有两个工作线程可以执行此代码的意义上说,您的代码是有效的:
std::cout << "thread " << id << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
cv.notify_one();
并发,但这并不完全是因为您使用 std::condition_variable
一次解除对一个工作线程的阻塞。事实上,您并没有真正改变原始 cplusplus.com 示例中的行为。在这两个程序中,代码的关键部分都由一个锁定的互斥锁保护,这保证了任何时候只有一个线程可以持有锁。
您使用 std::condition_variable
到 "serialize" 线程的执行本身并不能阻止并发执行,因为调用 notify_one()
不能保证只有一个线程从 wait()
。原因是允许线程在没有任何通知的情况下从 wait()
伪造 return:
The function will unblock when signaled by a call to notify_one()
or a
call to notify_all()
, or spuriously.
从某种意义上说,您正在被互斥锁拯救。如果你这样写你的辅助函数:
void print_id(int id) {
{
std::unique_lock<std::mutex> lck(mtx);
while (!ready)
{
std::cout << "waiting for unlock";
cv.wait(lck);
}
}
std::cout << "thread " << id << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
cv.notify_one();
}
此代码会在线程解除阻塞后立即解锁互斥量。这并不安全,因为多个线程可以在没有 锁定互斥锁保护的情况下到达您的临界区。
所以我认为您的程序会做您想做的事情 - 在没有并发的情况下在多个线程中运行一段代码 - 但可能不是出于您期望的原因。
按条件阻塞的惯用方法是定义一个谓词测试,当线程准备好继续时通过,否则失败,并在循环中检查它:
std::unique_lock<std::mutex> lock(mutex);
while (!predicate)
condition.wait(lock);
这个习惯用法可以正确处理虚假唤醒,因为谓词测试将失败并且线程将再次调用 wait()
。
尽管您的程序看起来与此非常相似,但它并没有完全做到这一点,因为您的谓词允许所有线程继续进行,而不仅仅是一个线程,这不是您所说的您想要的。由于锁定的互斥体,它无论如何都能工作。
您可以通过更改谓词测试使其仅针对下一个线程通过,并使用 notify_all()
:
来按顺序一次解锁线程
#include <atomic>
...
std::atomic<int> ready(-1);
void print_id(int id) {
{
std::unique_lock<std::mutex> lck(mtx);
while (ready != id)
cv.wait(lck);
}
std::cout << "thread " << id << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
++ready;
cv.notify_all();
}
void go() {
ready = 0;
cv.notify_all();
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i<10; ++i)
threads[i] = std::thread(print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto& th : threads) th.join();
return 0;
}
请注意这个谓词测试 ready != id
如何仅在线程应该继续时通过。我还为 ready
使用了 std::atomic<int>
以便 notification does not require locking the mutex.
修改后的谓词代码是正确的,但是一个缺点是我们必须将notify_one()
更改为notify_all()
以确保唤醒下一个线程。这会唤醒所有线程,只是让除其中一个以外的所有线程重新等待,这会消耗一点性能。优化它的一种方法是创建 N condition_variable
个实例(例如在数组中)并让每个线程等待自己的 condition_variable
个实例。
鉴于此示例代码取自 cplusplus.com
并进行了修改#include "stdafx.h"
// condition_variable example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void print_id(int id) {
std::unique_lock<std::mutex> lck(mtx);
while (!ready)
{
std::cout << "waiting for unlock";
cv.wait(lck);
}
std::cout << "thread " << id << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
cv.notify_one();
}
void go() {
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_one();
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i<10; ++i)
threads[i] = std::thread(print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto& th : threads) th.join();
return 0;
}
这是否正确地阻塞了每个线程,直到一个线程被 notify_one() 调用唤醒?或者这实际上是在解锁所有线程?
我试图同时启动所有线程,然后阻塞直到前一个线程完成。顺序并不重要,但我不能让他们同时执行所有代码。
cppreference 后:
void notify_one();
If any threads are waiting on *this, calling notify_one unblocks one of the waiting threads.
所以它只解锁了一个线程,你不能说是哪一个。为了解锁所有线程,您需要使用 void notify_all();
如果您尝试一次 运行 所有线程,您应该在 go()
中使用 notify_all()
,然后在 notify_one()
中使用顺序 运行 =15=]
请记住,即使使用 notify_all()
所有线程都会尝试锁定 lck
,但只有一个会成功。
从没有两个工作线程可以执行此代码的意义上说,您的代码是有效的:
std::cout << "thread " << id << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
cv.notify_one();
并发,但这并不完全是因为您使用 std::condition_variable
一次解除对一个工作线程的阻塞。事实上,您并没有真正改变原始 cplusplus.com 示例中的行为。在这两个程序中,代码的关键部分都由一个锁定的互斥锁保护,这保证了任何时候只有一个线程可以持有锁。
您使用 std::condition_variable
到 "serialize" 线程的执行本身并不能阻止并发执行,因为调用 notify_one()
不能保证只有一个线程从 wait()
。原因是允许线程在没有任何通知的情况下从 wait()
伪造 return:
The function will unblock when signaled by a call to
notify_one()
or a call tonotify_all()
, or spuriously.
从某种意义上说,您正在被互斥锁拯救。如果你这样写你的辅助函数:
void print_id(int id) {
{
std::unique_lock<std::mutex> lck(mtx);
while (!ready)
{
std::cout << "waiting for unlock";
cv.wait(lck);
}
}
std::cout << "thread " << id << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
cv.notify_one();
}
此代码会在线程解除阻塞后立即解锁互斥量。这并不安全,因为多个线程可以在没有 锁定互斥锁保护的情况下到达您的临界区。
所以我认为您的程序会做您想做的事情 - 在没有并发的情况下在多个线程中运行一段代码 - 但可能不是出于您期望的原因。
按条件阻塞的惯用方法是定义一个谓词测试,当线程准备好继续时通过,否则失败,并在循环中检查它:
std::unique_lock<std::mutex> lock(mutex);
while (!predicate)
condition.wait(lock);
这个习惯用法可以正确处理虚假唤醒,因为谓词测试将失败并且线程将再次调用 wait()
。
尽管您的程序看起来与此非常相似,但它并没有完全做到这一点,因为您的谓词允许所有线程继续进行,而不仅仅是一个线程,这不是您所说的您想要的。由于锁定的互斥体,它无论如何都能工作。
您可以通过更改谓词测试使其仅针对下一个线程通过,并使用 notify_all()
:
#include <atomic>
...
std::atomic<int> ready(-1);
void print_id(int id) {
{
std::unique_lock<std::mutex> lck(mtx);
while (ready != id)
cv.wait(lck);
}
std::cout << "thread " << id << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
++ready;
cv.notify_all();
}
void go() {
ready = 0;
cv.notify_all();
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i<10; ++i)
threads[i] = std::thread(print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto& th : threads) th.join();
return 0;
}
请注意这个谓词测试 ready != id
如何仅在线程应该继续时通过。我还为 ready
使用了 std::atomic<int>
以便 notification does not require locking the mutex.
修改后的谓词代码是正确的,但是一个缺点是我们必须将notify_one()
更改为notify_all()
以确保唤醒下一个线程。这会唤醒所有线程,只是让除其中一个以外的所有线程重新等待,这会消耗一点性能。优化它的一种方法是创建 N condition_variable
个实例(例如在数组中)并让每个线程等待自己的 condition_variable
个实例。