C++ 条件变量,用于发出分离线程执行停顿结束的信号
C++ Condition variable to signal end of detached thread execution stalls
我有一些代码,我正在处理其中生成分离线程的地方,做了一些工作,然后应该等待来自 main()
的信号,然后再将另一个信号发送回 main 指示线程已经退出了。
我对条件变量还很陌生,但是我以前使用过一些多线程代码。 (主要是互斥量。)
这是我尝试实现的,但它的行为与我预期的不同。 (我可能误解了什么。)
这背后的想法是将包含两个标志的结构传递给每个分离的线程。第一个标志表示 main()
表示“可以退出,并退出线程函数的末尾”。第二个标志由线程本身设置,并向 main()
发出线程确实已退出的信号。 (这只是为了确认来自 main()
的信号接收正常并发回一些东西。)
#include <cstdlib> // std::atoi
#include <iostream>
#include <thread>
#include <vector>
#include <random>
#include <future>
#include <condition_variable>
#include <mutex>
struct ThreadStruct
{
int id;
std::condition_variable cv;
std::mutex m;
int ok_to_exit;
int exit_confirm;
};
void Pause()
{
std::cout << "Press enter to continue" << std::endl;
std::cin.get();
}
void detachedThread(ThreadStruct* threadData)
{
std::cout << "START: Detached Thread " << threadData->id << std::endl;
// Performs some arbitrary amount of work.
for(int i = 0; i < 100000; ++ i);
std::cout << "FINISH: Detached thread " << threadData->id << std::endl;
std::unique_lock<std::mutex> lock(threadData->m);
std::cout << "WAIT: Detached thread " << threadData->id << std::endl;
threadData->cv.wait(lock, [threadData]{return threadData->ok_to_exit == 1;});
std::cout << "EXIT: Detached thread " << threadData->id << std::endl;
threadData->exit_confirm = 1;
}
int main(int argc, char** argv)
{
int totalThreadCount = 1;
ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount];
std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl;
for(int i = totalThreadCount - 1; i >= 0; --i)
{
perThreadData[i].id = i;
perThreadData[i].ok_to_exit = 0;
perThreadData[i].exit_confirm = 0;
std::thread t(detachedThread, &perThreadData[i]);
t.detach();
}
for(int i{0}; i < totalThreadCount; ++i)
{
ThreadStruct *threadData = &perThreadData[i];
std::cout << "Waiting for lock - main() thread" << std::endl;
std::unique_lock<std::mutex> lock(perThreadData[i].m);
std::cout << "Lock obtained - main() thread" << std::endl;
perThreadData[i].cv.wait(lock);
threadData->ok_to_exit = 1;
// added after comment from Sergey
threadData->cv.notify_all();
std::cout << "Done - main() thread" << std::endl;
}
for(int i{0}; i < totalThreadCount; ++i)
{
std::size_t thread_index = i;
ThreadStruct& threadData = perThreadData[thread_index];
std::unique_lock<std::mutex> lock(threadData.m);
std::cout << "i=" << i << std::endl;
int &exit_confirm = threadData.exit_confirm;
threadData.cv.wait(lock, [exit_confirm]{return exit_confirm == 1;});
std::cout << "i=" << i << " finished!" << std::endl;
}
Pause();
return 0;
}
这运行到行:
WAIT: Detached thread 0
但是分离的线程永远不会退出。我做错了什么?
编辑:进一步实验 - 这有帮助吗?
我认为通过删除一个步骤来简化事情可能会有所帮助。在下面的示例中,main()
不会向分离线程发送信号 ,它只是等待来自 分离线程的信号。
但同样,此代码在打印 DROP
后挂起...这意味着分离的线程正常退出,但 main()
不知道。
#include <cstdlib> // std::atoi
#include <iostream>
#include <thread>
#include <vector>
#include <random>
#include <future>
#include <condition_variable>
#include <mutex>
struct ThreadStruct
{
int id;
std::condition_variable cv;
std::mutex m;
int ok_to_exit;
int exit_confirm;
};
void Pause()
{
std::cout << "Press enter to continue" << std::endl;
std::cin.get();
}
void detachedThread(ThreadStruct* threadData)
{
std::cout << "START: Detached Thread " << threadData->id << std::endl;
// Performs some arbitrary amount of work.
for(int i = 0; i < 100000; ++ i);
std::cout << "FINISH: Detached thread " << threadData->id << std::endl;
std::unique_lock<std::mutex> lock(threadData->m);
std::cout << "EXIT: Detached thread " << threadData->id << std::endl;
threadData->exit_confirm = 1;
threadData->cv.notify_all();
std::cout << "DROP" << std::endl;
}
int main(int argc, char** argv)
{
int totalThreadCount = 1;
ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount];
std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl;
for(int i = totalThreadCount - 1; i >= 0; --i)
{
perThreadData[i].id = i;
perThreadData[i].ok_to_exit = 0;
perThreadData[i].exit_confirm = 0;
std::thread t(detachedThread, &perThreadData[i]);
t.detach();
}
for(int i{0}; i < totalThreadCount; ++i)
{
std::size_t thread_index = i;
ThreadStruct& threadData = perThreadData[thread_index];
std::cout << "Waiting for mutex" << std::endl;
std::unique_lock<std::mutex> lock(threadData.m);
std::cout << "i=" << i << std::endl;
int &exit_confirm = threadData.exit_confirm;
threadData.cv.wait(lock, [exit_confirm]{return exit_confirm == 1;});
std::cout << "i=" << i << " finished!" << std::endl;
}
Pause();
return 0;
}
您的 lambda 正在捕获 by-value,因此它永远不会看到对 exit_confim
.
所做的更改
捕获 by-reference 改为:
int& exit_confirm = threadData.exit_confirm;
threadData.cv.wait(lock, [&exit_confirm] { return exit_confirm == 1; });
// ^
// | capture by-reference
您还需要delete[]
您new[]
所做的
delete[] ThreadStruct;
当您完成 struct
s 时。
我还注意到在 free 之后有一些堆使用,但是当我对代码进行一些简化时,它神奇地消失了。我没有进一步调查。
一些建议:
- 将处理
ThreadStruct
成员变量和锁的代码移至 ThreadStruct
class。它通常使阅读和维护更简单。
- 删除未使用的变量和headers。
- 不要使用
new[]
/delete[]
。对于此示例,您可以改用 std::vector<ThreadStruct>
。
- 根本不要
detach()
- 我在下面没有做任何事情,但我建议使用 join()
(在附加线程上)进行最后的同步。这就是它的用途。
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
struct ThreadStruct {
int id;
// move this function into the ThreadStruct class
void detachedThread() {
std::cout << "START: Detached Thread " << id << std::endl;
// Performs some arbitrary amount of work (optimized away here)
std::cout << "FINISH: Detached thread " << id << std::endl;
std::lock_guard<std::mutex> lock(m);
std::cout << "EXIT: Detached thread " << id << std::endl;
exit_confirm = 1;
cv.notify_all();
std::cout << "DROP" << std::endl;
}
// add support functions instead of doing these things in your normal code
void wait_for_exit_confirm() {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [this] { return exit_confirm == 1; });
}
void spawn_detached() {
std::thread(&ThreadStruct::detachedThread, this).detach();
}
private:
std::condition_variable cv;
std::mutex m;
int exit_confirm = 0; // initialize
};
有了上面的内容,main
变得更干净了:
int main() {
int totalThreadCount = 1;
std::vector<ThreadStruct> perThreadData(totalThreadCount);
std::cout << "Main thread starting " << perThreadData.size() << " thread(s)\n";
int i = 0;
for(auto& threadData : perThreadData) {
threadData.id = i++;
threadData.spawn_detached();
}
for(auto& threadData : perThreadData) {
std::cout << "Waiting for mutex" << std::endl;
std::cout << "i=" << threadData.id << std::endl;
threadData.wait_for_exit_confirm();
std::cout << "i=" << threadData.id << " finished!" << std::endl;
}
std::cout << "Press enter to continue" << std::endl;
std::cin.get();
}
为了将来的兴趣:修复了问题中发布的原始 MWE。有两个问题
没有通过引用在 lambda 中捕获局部变量(参见其他答案)
1 次 wait()
调用过多
#include <cstdlib> // std::atoi
#include <iostream>
#include <thread>
#include <vector>
#include <random>
#include <future>
#include <condition_variable>
#include <mutex>
struct ThreadStruct
{
int id;
std::condition_variable cv;
std::mutex m;
int ok_to_exit;
int exit_confirm;
};
void Pause()
{
std::cout << "Press enter to continue" << std::endl;
std::cin.get();
}
void detachedThread(ThreadStruct* threadData)
{
std::cout << "START: Detached Thread " << threadData->id << std::endl;
// Performs some arbitrary amount of work.
for (int i = 0; i < 100000; ++i);
std::cout << "FINISH: Detached thread " << threadData->id << std::endl;
std::unique_lock<std::mutex> lock(threadData->m);
std::cout << "WAIT: Detached thread " << threadData->id << std::endl;
threadData->cv.wait(lock, [&threadData]{return threadData->ok_to_exit == 1;});
std::cout << "EXIT: Detached thread " << threadData->id << std::endl;
threadData->exit_confirm = 1;
threadData->cv.notify_all();
std::cout << "DROP" << std::endl;
}
int main(int argc, char** argv)
{
int totalThreadCount = 1;
ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount];
std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl;
for (int i = totalThreadCount - 1; i >= 0; --i)
{
perThreadData[i].id = i;
perThreadData[i].ok_to_exit = 0;
perThreadData[i].exit_confirm = 0;
std::thread t(detachedThread, &perThreadData[i]);
t.detach();
}
for(int i{0}; i < totalThreadCount; ++ i)
{
ThreadStruct *threadData = &perThreadData[i];
std::cout << "Waiting for lock - main() thread" << std::endl;
std::unique_lock<std::mutex> lock(perThreadData[i].m);
std::cout << "Lock obtained - main() thread" << std::endl;
//perThreadData[i].cv.wait(lock, [&threadData]{return threadData->ok_to_exit == 1;});
std::cout << "Wait complete" << std::endl;
threadData->ok_to_exit = 1;
threadData->cv.notify_all();
std::cout << "Done - main() thread" << std::endl;
}
for (int i{ 0 }; i < totalThreadCount; ++i)
{
std::size_t thread_index = i;
ThreadStruct& threadData = perThreadData[thread_index];
std::cout << "Waiting for mutex" << std::endl;
std::unique_lock<std::mutex> lock(threadData.m);
std::cout << "i=" << i << std::endl;
int& exit_confirm = threadData.exit_confirm;
threadData.cv.wait(lock, [&exit_confirm] {return exit_confirm == 1; });
std::cout << "i=" << i << " finished!" << std::endl;
}
Pause();
return 0;
}
我有一些代码,我正在处理其中生成分离线程的地方,做了一些工作,然后应该等待来自 main()
的信号,然后再将另一个信号发送回 main 指示线程已经退出了。
我对条件变量还很陌生,但是我以前使用过一些多线程代码。 (主要是互斥量。)
这是我尝试实现的,但它的行为与我预期的不同。 (我可能误解了什么。)
这背后的想法是将包含两个标志的结构传递给每个分离的线程。第一个标志表示 main()
表示“可以退出,并退出线程函数的末尾”。第二个标志由线程本身设置,并向 main()
发出线程确实已退出的信号。 (这只是为了确认来自 main()
的信号接收正常并发回一些东西。)
#include <cstdlib> // std::atoi
#include <iostream>
#include <thread>
#include <vector>
#include <random>
#include <future>
#include <condition_variable>
#include <mutex>
struct ThreadStruct
{
int id;
std::condition_variable cv;
std::mutex m;
int ok_to_exit;
int exit_confirm;
};
void Pause()
{
std::cout << "Press enter to continue" << std::endl;
std::cin.get();
}
void detachedThread(ThreadStruct* threadData)
{
std::cout << "START: Detached Thread " << threadData->id << std::endl;
// Performs some arbitrary amount of work.
for(int i = 0; i < 100000; ++ i);
std::cout << "FINISH: Detached thread " << threadData->id << std::endl;
std::unique_lock<std::mutex> lock(threadData->m);
std::cout << "WAIT: Detached thread " << threadData->id << std::endl;
threadData->cv.wait(lock, [threadData]{return threadData->ok_to_exit == 1;});
std::cout << "EXIT: Detached thread " << threadData->id << std::endl;
threadData->exit_confirm = 1;
}
int main(int argc, char** argv)
{
int totalThreadCount = 1;
ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount];
std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl;
for(int i = totalThreadCount - 1; i >= 0; --i)
{
perThreadData[i].id = i;
perThreadData[i].ok_to_exit = 0;
perThreadData[i].exit_confirm = 0;
std::thread t(detachedThread, &perThreadData[i]);
t.detach();
}
for(int i{0}; i < totalThreadCount; ++i)
{
ThreadStruct *threadData = &perThreadData[i];
std::cout << "Waiting for lock - main() thread" << std::endl;
std::unique_lock<std::mutex> lock(perThreadData[i].m);
std::cout << "Lock obtained - main() thread" << std::endl;
perThreadData[i].cv.wait(lock);
threadData->ok_to_exit = 1;
// added after comment from Sergey
threadData->cv.notify_all();
std::cout << "Done - main() thread" << std::endl;
}
for(int i{0}; i < totalThreadCount; ++i)
{
std::size_t thread_index = i;
ThreadStruct& threadData = perThreadData[thread_index];
std::unique_lock<std::mutex> lock(threadData.m);
std::cout << "i=" << i << std::endl;
int &exit_confirm = threadData.exit_confirm;
threadData.cv.wait(lock, [exit_confirm]{return exit_confirm == 1;});
std::cout << "i=" << i << " finished!" << std::endl;
}
Pause();
return 0;
}
这运行到行:
WAIT: Detached thread 0
但是分离的线程永远不会退出。我做错了什么?
编辑:进一步实验 - 这有帮助吗?
我认为通过删除一个步骤来简化事情可能会有所帮助。在下面的示例中,main()
不会向分离线程发送信号 ,它只是等待来自 分离线程的信号。
但同样,此代码在打印 DROP
后挂起...这意味着分离的线程正常退出,但 main()
不知道。
#include <cstdlib> // std::atoi
#include <iostream>
#include <thread>
#include <vector>
#include <random>
#include <future>
#include <condition_variable>
#include <mutex>
struct ThreadStruct
{
int id;
std::condition_variable cv;
std::mutex m;
int ok_to_exit;
int exit_confirm;
};
void Pause()
{
std::cout << "Press enter to continue" << std::endl;
std::cin.get();
}
void detachedThread(ThreadStruct* threadData)
{
std::cout << "START: Detached Thread " << threadData->id << std::endl;
// Performs some arbitrary amount of work.
for(int i = 0; i < 100000; ++ i);
std::cout << "FINISH: Detached thread " << threadData->id << std::endl;
std::unique_lock<std::mutex> lock(threadData->m);
std::cout << "EXIT: Detached thread " << threadData->id << std::endl;
threadData->exit_confirm = 1;
threadData->cv.notify_all();
std::cout << "DROP" << std::endl;
}
int main(int argc, char** argv)
{
int totalThreadCount = 1;
ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount];
std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl;
for(int i = totalThreadCount - 1; i >= 0; --i)
{
perThreadData[i].id = i;
perThreadData[i].ok_to_exit = 0;
perThreadData[i].exit_confirm = 0;
std::thread t(detachedThread, &perThreadData[i]);
t.detach();
}
for(int i{0}; i < totalThreadCount; ++i)
{
std::size_t thread_index = i;
ThreadStruct& threadData = perThreadData[thread_index];
std::cout << "Waiting for mutex" << std::endl;
std::unique_lock<std::mutex> lock(threadData.m);
std::cout << "i=" << i << std::endl;
int &exit_confirm = threadData.exit_confirm;
threadData.cv.wait(lock, [exit_confirm]{return exit_confirm == 1;});
std::cout << "i=" << i << " finished!" << std::endl;
}
Pause();
return 0;
}
您的 lambda 正在捕获 by-value,因此它永远不会看到对 exit_confim
.
捕获 by-reference 改为:
int& exit_confirm = threadData.exit_confirm;
threadData.cv.wait(lock, [&exit_confirm] { return exit_confirm == 1; });
// ^
// | capture by-reference
您还需要delete[]
您new[]
所做的
delete[] ThreadStruct;
当您完成 struct
s 时。
我还注意到在 free 之后有一些堆使用,但是当我对代码进行一些简化时,它神奇地消失了。我没有进一步调查。
一些建议:
- 将处理
ThreadStruct
成员变量和锁的代码移至ThreadStruct
class。它通常使阅读和维护更简单。 - 删除未使用的变量和headers。
- 不要使用
new[]
/delete[]
。对于此示例,您可以改用std::vector<ThreadStruct>
。 - 根本不要
detach()
- 我在下面没有做任何事情,但我建议使用join()
(在附加线程上)进行最后的同步。这就是它的用途。
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
struct ThreadStruct {
int id;
// move this function into the ThreadStruct class
void detachedThread() {
std::cout << "START: Detached Thread " << id << std::endl;
// Performs some arbitrary amount of work (optimized away here)
std::cout << "FINISH: Detached thread " << id << std::endl;
std::lock_guard<std::mutex> lock(m);
std::cout << "EXIT: Detached thread " << id << std::endl;
exit_confirm = 1;
cv.notify_all();
std::cout << "DROP" << std::endl;
}
// add support functions instead of doing these things in your normal code
void wait_for_exit_confirm() {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [this] { return exit_confirm == 1; });
}
void spawn_detached() {
std::thread(&ThreadStruct::detachedThread, this).detach();
}
private:
std::condition_variable cv;
std::mutex m;
int exit_confirm = 0; // initialize
};
有了上面的内容,main
变得更干净了:
int main() {
int totalThreadCount = 1;
std::vector<ThreadStruct> perThreadData(totalThreadCount);
std::cout << "Main thread starting " << perThreadData.size() << " thread(s)\n";
int i = 0;
for(auto& threadData : perThreadData) {
threadData.id = i++;
threadData.spawn_detached();
}
for(auto& threadData : perThreadData) {
std::cout << "Waiting for mutex" << std::endl;
std::cout << "i=" << threadData.id << std::endl;
threadData.wait_for_exit_confirm();
std::cout << "i=" << threadData.id << " finished!" << std::endl;
}
std::cout << "Press enter to continue" << std::endl;
std::cin.get();
}
为了将来的兴趣:修复了问题中发布的原始 MWE。有两个问题
没有通过引用在 lambda 中捕获局部变量(参见其他答案)
1 次
wait()
调用过多#include <cstdlib> // std::atoi #include <iostream> #include <thread> #include <vector> #include <random> #include <future> #include <condition_variable> #include <mutex> struct ThreadStruct { int id; std::condition_variable cv; std::mutex m; int ok_to_exit; int exit_confirm; }; void Pause() { std::cout << "Press enter to continue" << std::endl; std::cin.get(); } void detachedThread(ThreadStruct* threadData) { std::cout << "START: Detached Thread " << threadData->id << std::endl; // Performs some arbitrary amount of work. for (int i = 0; i < 100000; ++i); std::cout << "FINISH: Detached thread " << threadData->id << std::endl; std::unique_lock<std::mutex> lock(threadData->m); std::cout << "WAIT: Detached thread " << threadData->id << std::endl; threadData->cv.wait(lock, [&threadData]{return threadData->ok_to_exit == 1;}); std::cout << "EXIT: Detached thread " << threadData->id << std::endl; threadData->exit_confirm = 1; threadData->cv.notify_all(); std::cout << "DROP" << std::endl; } int main(int argc, char** argv) { int totalThreadCount = 1; ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount]; std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl; for (int i = totalThreadCount - 1; i >= 0; --i) { perThreadData[i].id = i; perThreadData[i].ok_to_exit = 0; perThreadData[i].exit_confirm = 0; std::thread t(detachedThread, &perThreadData[i]); t.detach(); } for(int i{0}; i < totalThreadCount; ++ i) { ThreadStruct *threadData = &perThreadData[i]; std::cout << "Waiting for lock - main() thread" << std::endl; std::unique_lock<std::mutex> lock(perThreadData[i].m); std::cout << "Lock obtained - main() thread" << std::endl; //perThreadData[i].cv.wait(lock, [&threadData]{return threadData->ok_to_exit == 1;}); std::cout << "Wait complete" << std::endl; threadData->ok_to_exit = 1; threadData->cv.notify_all(); std::cout << "Done - main() thread" << std::endl; } for (int i{ 0 }; i < totalThreadCount; ++i) { std::size_t thread_index = i; ThreadStruct& threadData = perThreadData[thread_index]; std::cout << "Waiting for mutex" << std::endl; std::unique_lock<std::mutex> lock(threadData.m); std::cout << "i=" << i << std::endl; int& exit_confirm = threadData.exit_confirm; threadData.cv.wait(lock, [&exit_confirm] {return exit_confirm == 1; }); std::cout << "i=" << i << " finished!" << std::endl; } Pause(); return 0; }