如何在不同线程上同步函数 运行 的实例(在 C++11 中)?
How to synchronize instances of a function running on different threads (in c++11)?
假设有许多线程由同一函数的循环 运行 个实例组成,但每次迭代的开始都需要同步(因此先完成的线程必须等待最后一个开始新的迭代)。这在 C++11 中如何实现?
...
post 的其余部分正是我尝试过的以及它是如何失败的。
我正在使用一个计数器,"sync",初始设置为 3(线程数)。每个线程在函数结束时都会从该计数器中减去 1 并开始等待。当计数器为0时,表示这3个都完成了一轮,所以主线程会将计数器重置为3,并通知线程唤醒它们。
这在大多数情况下都有效,但有时一两个线程无法唤醒。
所以这些是全局变量:
mutex syncMutex;
condition_variable syncCV;
int sync;
这是在线程中循环运行的函数的末尾:
unique_lock<mutex> lk(syncMutex);
cout << "Thread num: " << mFieldNum << " got sync value: " << sync;
sync --;
syncCV.notify_all();
cout << " and goes to sleep..." << endl;
syncCV.wait(lk, []{return sync == numFields;});
cout << "Thread num: " << mFieldNum << " woke up" << endl;
}
这在主线程中循环运行:
unique_lock<mutex> lk(syncMutex);
syncCV.wait(lk, []{return sync == 0;});
sync = 3;
lk.unlock();
cout << "Notifying all threads!" << endl;
syncCV.notify_all();
这是它失败时产生的输出(线程 #3 没有唤醒):
Thread num: 1 got sync value: 3 and goes to sleep...
Thread num: 2 got sync value: 2 and goes to sleep...
Thread num: 3 got sync value: 1 and goes to sleep...
Notifying all threads!
Thread num: 1 woke up
Thread num: 2 woke up
Thread num: 3 woke up
Thread num: 2 got sync value: 3 and goes to sleep...
Thread num: 1 got sync value: 2 and goes to sleep...
Thread num: 3 got sync value: 1 and goes to sleep...
Notifying all threads!
Thread num: 2 woke up
Thread num: 1 woke up
Thread num: 2 got sync value: 3 and goes to sleep...
Thread num: 1 got sync value: 2 and goes to sleep...
有人知道吗?感谢您阅读。
您的线程同步有很多问题。托尼在他的评论中提到了一个。在调用 syncCV.notify_all() 之前调用 lk.unlock() 的主循环代码中也存在潜在的竞争条件。 (这可能允许线程错过 notify_all 信号。)
我会通过两种方式调整您的代码。首先,要解决使用 "sync == numFields" 作为您的条件的问题,正如 Tony 指出的那样,在另一个线程执行 sync-- 之后可能不成立,使用每个线程 [=40 作为您的条件是有意义的=] 每个主线程循环只有一次。在我的示例代码中,这是通过引入 "done[numFields]" 变量来实现的。其次,引入两个条件变量是有意义的——一个向工作线程发出信号,表明新的主循环迭代已经开始,第二个向主线程发出信号,表明工作线程已完成。 (注意两个条件变量使用相同的互斥体。)
这是一个完整的程序,以您的示例代码为模型,结合了这两种方法:
#include <iostream>
using std::cout;
using std::endl;
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
std::mutex syncMutex;
std::condition_variable readyCV;
std::condition_variable doneCV;
int sync;
bool exitFlag;
const int numFields = 5;
bool done[numFields];
const int nloops = 10;
void thread_func(int i) {
int mFieldNum = i;
while (true) {
std::unique_lock<std::mutex> lk(syncMutex);
readyCV.wait(lk, [mFieldNum]{return exitFlag || !done[mFieldNum-1];});
if (exitFlag) break;
cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << sync;
if (--sync == 0) doneCV.notify_all();
done[mFieldNum-1] = true;
readyCV.notify_all();
cout << " and goes to sleep..." << endl;
}
}
int main (int argc, char* argv[]) {
exitFlag = false;
sync = 0;
std::vector<std::thread> threads;
for (int i = 0; i < numFields; i++) {
done[i] = true;
threads.emplace_back (thread_func, i+1);
}
for (int i = 0; i <= nloops; i++) {
std::unique_lock<std::mutex> lk(syncMutex);
doneCV.wait(lk, []{return sync == 0;});
cout << "main loop (lk held), i = " << i << endl;
sync = numFields;
if (i == nloops) exitFlag = true;
else for (auto &b : done) b = false;
cout << "Notifying all threads!" << endl;
readyCV.notify_all();
}
for (auto& t : threads) t.join();
}
(我还添加了一个 exitFlag 和 std::thread::join(),这样程序就可以很好地清理和终止。)
这与经典的生产者-消费者实现(一个生产者,numFields 个消费者)非常相似,增加了每个消费者线程在每个生产者线程循环中仅 运行 一次的约束。
如果您愿意放弃重用工作线程,您也可以更简单地实现本质上相同的程序逻辑。 (在您的示例代码和我上面的示例中,它们充当一种专用线程池。)在我的下一个示例中,为主循环的每次迭代创建新线程。这使得线程同步更简单,并消除了条件变量。
#include <iostream>
using std::cout;
using std::endl;
#include <atomic>
#include <mutex>
#include <thread>
#include <vector>
std::mutex coutMutex;
std::atomic<int> sync;
const int numFields = 5;
bool done[numFields];
const int nloops = 10;
void thread_func(int i) {
int mFieldNum = i;
int mySync = sync--;
{
std::lock_guard<std::mutex> lk(coutMutex);
cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << mySync << endl;
}
}
int main (int argc, char* argv[]) {
for (int i = 0; i < nloops; i++) {
cout << "main loop, i = " << i << endl;
std::vector<std::thread> threads;
sync = numFields;
for (int i = 0; i < numFields; i++) threads.emplace_back (thread_func, i+1);
for (auto& t : threads) t.join();
}
}
(coutMutex 很不错,控制台输出不会出现乱码,但对于核心同步逻辑来说不是必需的。)
如果在您的实际用例中您不需要 thread_func 在迭代之间保持活动状态(例如,保留某些状态),并且如果每次调用 thread_func 做了足够的工作以至于创建一个新线程的成本 运行 相比之下并不重要,然后为每个主循环迭代创建新线程(而不是重用线程)是直接、明智和简单的.
快乐的多线程黑客!
K。弗兰克
假设有许多线程由同一函数的循环 运行 个实例组成,但每次迭代的开始都需要同步(因此先完成的线程必须等待最后一个开始新的迭代)。这在 C++11 中如何实现?
...
post 的其余部分正是我尝试过的以及它是如何失败的。
我正在使用一个计数器,"sync",初始设置为 3(线程数)。每个线程在函数结束时都会从该计数器中减去 1 并开始等待。当计数器为0时,表示这3个都完成了一轮,所以主线程会将计数器重置为3,并通知线程唤醒它们。
这在大多数情况下都有效,但有时一两个线程无法唤醒。
所以这些是全局变量:
mutex syncMutex;
condition_variable syncCV;
int sync;
这是在线程中循环运行的函数的末尾:
unique_lock<mutex> lk(syncMutex);
cout << "Thread num: " << mFieldNum << " got sync value: " << sync;
sync --;
syncCV.notify_all();
cout << " and goes to sleep..." << endl;
syncCV.wait(lk, []{return sync == numFields;});
cout << "Thread num: " << mFieldNum << " woke up" << endl;
}
这在主线程中循环运行:
unique_lock<mutex> lk(syncMutex);
syncCV.wait(lk, []{return sync == 0;});
sync = 3;
lk.unlock();
cout << "Notifying all threads!" << endl;
syncCV.notify_all();
这是它失败时产生的输出(线程 #3 没有唤醒):
Thread num: 1 got sync value: 3 and goes to sleep...
Thread num: 2 got sync value: 2 and goes to sleep...
Thread num: 3 got sync value: 1 and goes to sleep...
Notifying all threads!
Thread num: 1 woke up
Thread num: 2 woke up
Thread num: 3 woke up
Thread num: 2 got sync value: 3 and goes to sleep...
Thread num: 1 got sync value: 2 and goes to sleep...
Thread num: 3 got sync value: 1 and goes to sleep...
Notifying all threads!
Thread num: 2 woke up
Thread num: 1 woke up
Thread num: 2 got sync value: 3 and goes to sleep...
Thread num: 1 got sync value: 2 and goes to sleep...
有人知道吗?感谢您阅读。
您的线程同步有很多问题。托尼在他的评论中提到了一个。在调用 syncCV.notify_all() 之前调用 lk.unlock() 的主循环代码中也存在潜在的竞争条件。 (这可能允许线程错过 notify_all 信号。)
我会通过两种方式调整您的代码。首先,要解决使用 "sync == numFields" 作为您的条件的问题,正如 Tony 指出的那样,在另一个线程执行 sync-- 之后可能不成立,使用每个线程 [=40 作为您的条件是有意义的=] 每个主线程循环只有一次。在我的示例代码中,这是通过引入 "done[numFields]" 变量来实现的。其次,引入两个条件变量是有意义的——一个向工作线程发出信号,表明新的主循环迭代已经开始,第二个向主线程发出信号,表明工作线程已完成。 (注意两个条件变量使用相同的互斥体。)
这是一个完整的程序,以您的示例代码为模型,结合了这两种方法:
#include <iostream>
using std::cout;
using std::endl;
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
std::mutex syncMutex;
std::condition_variable readyCV;
std::condition_variable doneCV;
int sync;
bool exitFlag;
const int numFields = 5;
bool done[numFields];
const int nloops = 10;
void thread_func(int i) {
int mFieldNum = i;
while (true) {
std::unique_lock<std::mutex> lk(syncMutex);
readyCV.wait(lk, [mFieldNum]{return exitFlag || !done[mFieldNum-1];});
if (exitFlag) break;
cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << sync;
if (--sync == 0) doneCV.notify_all();
done[mFieldNum-1] = true;
readyCV.notify_all();
cout << " and goes to sleep..." << endl;
}
}
int main (int argc, char* argv[]) {
exitFlag = false;
sync = 0;
std::vector<std::thread> threads;
for (int i = 0; i < numFields; i++) {
done[i] = true;
threads.emplace_back (thread_func, i+1);
}
for (int i = 0; i <= nloops; i++) {
std::unique_lock<std::mutex> lk(syncMutex);
doneCV.wait(lk, []{return sync == 0;});
cout << "main loop (lk held), i = " << i << endl;
sync = numFields;
if (i == nloops) exitFlag = true;
else for (auto &b : done) b = false;
cout << "Notifying all threads!" << endl;
readyCV.notify_all();
}
for (auto& t : threads) t.join();
}
(我还添加了一个 exitFlag 和 std::thread::join(),这样程序就可以很好地清理和终止。)
这与经典的生产者-消费者实现(一个生产者,numFields 个消费者)非常相似,增加了每个消费者线程在每个生产者线程循环中仅 运行 一次的约束。
如果您愿意放弃重用工作线程,您也可以更简单地实现本质上相同的程序逻辑。 (在您的示例代码和我上面的示例中,它们充当一种专用线程池。)在我的下一个示例中,为主循环的每次迭代创建新线程。这使得线程同步更简单,并消除了条件变量。
#include <iostream>
using std::cout;
using std::endl;
#include <atomic>
#include <mutex>
#include <thread>
#include <vector>
std::mutex coutMutex;
std::atomic<int> sync;
const int numFields = 5;
bool done[numFields];
const int nloops = 10;
void thread_func(int i) {
int mFieldNum = i;
int mySync = sync--;
{
std::lock_guard<std::mutex> lk(coutMutex);
cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << mySync << endl;
}
}
int main (int argc, char* argv[]) {
for (int i = 0; i < nloops; i++) {
cout << "main loop, i = " << i << endl;
std::vector<std::thread> threads;
sync = numFields;
for (int i = 0; i < numFields; i++) threads.emplace_back (thread_func, i+1);
for (auto& t : threads) t.join();
}
}
(coutMutex 很不错,控制台输出不会出现乱码,但对于核心同步逻辑来说不是必需的。)
如果在您的实际用例中您不需要 thread_func 在迭代之间保持活动状态(例如,保留某些状态),并且如果每次调用 thread_func 做了足够的工作以至于创建一个新线程的成本 运行 相比之下并不重要,然后为每个主循环迭代创建新线程(而不是重用线程)是直接、明智和简单的.
快乐的多线程黑客!
K。弗兰克