使用单个条件变量暂停多个线程
Using a single Condition Variable to pause multiple threads
我有一个启动 N 个线程的程序 (async/future)。我想让主线程设置一些数据,然后所有线程都应该运行,而主线程等待所有其他线程完成,然后这需要循环。
我的atm是这样的
int main()
{
//Start N new threads (std::future/std::async)
while(condition)
{
//Set Up Data Here
//Send Data to threads
{
std::lock_guard<std::mutex> lock(mrun);
bRun = true;
}
run.notify_all();
//Wait for threads
{
std::unique_lock<std::mutex> lock(mrun);
run.wait(lock, [] {return bDone; });
}
//Reset bools
bRun = false;
bDone = false;
}
//Get results from futures once complete
}
int thread()
{
while(otherCondition)
{
std::unique_lock<std::mutex> lock(mrun);
run.wait(lock, [] {return bRun; });
bDone = true;
//Do thread stuff here
lock.unlock();
run.notify_all();
}
}
但我看不到任何主线程或其他线程相互等待的迹象!知道我做错了什么或我该怎么做吗?
有几个问题。首先,您要在第一个工人醒来后立即设置 bDone
。因此主线程立即唤醒并开始准备下一个数据集。您想让主线程等到 all worker finished 处理他们的数据。其次,当一个worker完成处理后,它循环并立即检查bRun
。但是它无法判断 bRun == true
是否意味着下一个数据集准备好了,或者 last 数据集是否准备好了。您想等待下一个数据集。
像这样的东西应该可以工作:
std::mutex mrun;
std::condition_variable dataReady;
std::condition_variable workComplete;
int nCurrentIteration = 0;
int nWorkerCount = 0;
int main()
{
//Start N new threads (std::future/std::async)
while(condition)
{
//Set Up Data Here
//Send Data to threads
{
std::lock_guard<std::mutex> lock(mrun);
nWorkerCount = N;
++nCurrentIteration;
}
dataReady.notify_all();
//Wait for threads
{
std::unique_lock<std::mutex> lock(mrun);
workComplete.wait(lock, [] { return nWorkerCount == 0; });
}
}
//Get results from futures once complete
}
int thread()
{
int nNextIteration == 1;
while(otherCondition)
{
std::unique_lock<std::mutex> lock(mrun);
dataReady.wait(lock, [&nNextIteration] { return nCurrentIteration==nNextIteration; });
lock.unlock();
++nNextIteration;
//Do thread stuff here
lock.lock();
if (--nWorkerCount == 0)
{
lock.unlock();
workComplete.notify_one();
}
}
}
请注意,此解决方案并不十分完整。如果一个worker遇到异常,那么主线程就会挂掉(因为死掉的worker永远不会reducenWorkerCount
)。您可能需要一种策略来应对这种情况。
顺便说一下,这个模式叫做 barrier
。
我有一个启动 N 个线程的程序 (async/future)。我想让主线程设置一些数据,然后所有线程都应该运行,而主线程等待所有其他线程完成,然后这需要循环。
我的atm是这样的
int main()
{
//Start N new threads (std::future/std::async)
while(condition)
{
//Set Up Data Here
//Send Data to threads
{
std::lock_guard<std::mutex> lock(mrun);
bRun = true;
}
run.notify_all();
//Wait for threads
{
std::unique_lock<std::mutex> lock(mrun);
run.wait(lock, [] {return bDone; });
}
//Reset bools
bRun = false;
bDone = false;
}
//Get results from futures once complete
}
int thread()
{
while(otherCondition)
{
std::unique_lock<std::mutex> lock(mrun);
run.wait(lock, [] {return bRun; });
bDone = true;
//Do thread stuff here
lock.unlock();
run.notify_all();
}
}
但我看不到任何主线程或其他线程相互等待的迹象!知道我做错了什么或我该怎么做吗?
有几个问题。首先,您要在第一个工人醒来后立即设置 bDone
。因此主线程立即唤醒并开始准备下一个数据集。您想让主线程等到 all worker finished 处理他们的数据。其次,当一个worker完成处理后,它循环并立即检查bRun
。但是它无法判断 bRun == true
是否意味着下一个数据集准备好了,或者 last 数据集是否准备好了。您想等待下一个数据集。
像这样的东西应该可以工作:
std::mutex mrun;
std::condition_variable dataReady;
std::condition_variable workComplete;
int nCurrentIteration = 0;
int nWorkerCount = 0;
int main()
{
//Start N new threads (std::future/std::async)
while(condition)
{
//Set Up Data Here
//Send Data to threads
{
std::lock_guard<std::mutex> lock(mrun);
nWorkerCount = N;
++nCurrentIteration;
}
dataReady.notify_all();
//Wait for threads
{
std::unique_lock<std::mutex> lock(mrun);
workComplete.wait(lock, [] { return nWorkerCount == 0; });
}
}
//Get results from futures once complete
}
int thread()
{
int nNextIteration == 1;
while(otherCondition)
{
std::unique_lock<std::mutex> lock(mrun);
dataReady.wait(lock, [&nNextIteration] { return nCurrentIteration==nNextIteration; });
lock.unlock();
++nNextIteration;
//Do thread stuff here
lock.lock();
if (--nWorkerCount == 0)
{
lock.unlock();
workComplete.notify_one();
}
}
}
请注意,此解决方案并不十分完整。如果一个worker遇到异常,那么主线程就会挂掉(因为死掉的worker永远不会reducenWorkerCount
)。您可能需要一种策略来应对这种情况。
顺便说一下,这个模式叫做 barrier
。