为什么这个 c++ 多线程互斥代码偶尔会出现故障?
Why is this c++ multithreading mutex code exhibiting occasional failures?
我在 linux Debian 系统上使用下面的 foo.cpp 代码:
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <thread>
std::mutex mtx;
std::condition_variable cvar;
long next = 0;
void doit(long index){
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=]{return index == next;});
std::cout<< index << std::endl;
++next;
mtx.unlock();
cvar.notify_all();
return;
}
int main()
{
long n=50;
for (long i=0; i < n; ++i)
std::thread (doit,i).detach();
while(next != n)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return(0);
}
我编译它:
g++ -std=c++14 -pthread -o foo foo.cpp
它被设计为触发 50 个线程,分离的,它们由一个互斥体控制,condition_variable 在函数 doit 中,因此它们顺序执行互斥体块。
大部分时间都在工作,将数字 00 到 49 写入屏幕,然后终止。
但是,它有两种偶然的故障模式:
故障模式 1:上升到某个小于 50 的任意数字后,它中止并出现错误:
foo: ../nptl/pthread_mutex_lock.c:80: __pthread_mutex_lock: 断言 `mutex->__data.__owner == 0' 失败。
失败模式 2:上升到小于 50 的某个任意数字后,它挂起,必须用 ctrl-C 终止才能返回到终端提示符。
对于此行为的原因以及如何修复它的任何建议,我将不胜感激。
============================================= ============================
编辑: 好的,这是一个有效的修订版。我修复了这两个错误,并将锁名称从 "lock" 更改为 "lk" 以减少混淆。感谢您的帮助。
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
std::mutex mtx;
std::condition_variable cvar;
long next = 0;
void doit(long index){
std::unique_lock<std::mutex> lk(mtx);
cvar.wait(lk, [=]{return index == next;});
std::cout<< index << std::endl;
++next;
lk.unlock();
cvar.notify_all();
return;
}
int main()
{
long n=50;
for (long i=0; i < n; ++i)
std::thread (doit,i).detach();
{
std::unique_lock<std::mutex> lk(mtx);
cvar.wait(lk, [=]{return n == next;});
}
return(0);
}
while(next != n)
尝试访问变量 next
,该变量可由工作线程修改,无需任何同步创建竞争条件。它应该被同一个互斥锁覆盖:
{
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=]{return n == next;});
}
分离线程不是一个好主意。在从 main
.
返回之前,您应该将它们存储在某个地方然后 join
更新:您试图在 mutex
本身上调用 unlock
,而不是在锁定对象上调用它。通过构造锁定对象,您将解锁互斥体的责任委托给 lock
对象。应该是
lock.unlock();
cvar.notify_all();
为什么不保持简单?
int main() {
long n = 50;
std::vector<std::thread> threads;
for (long i = 0; i < n; ++i)
threads.emplace_back([=]() { std::cout << i << std::endl; });
for (const auto& t : threads) {
t.join();
}
return 0;
}
试试这个片段:你不应该使用 mtx.unlock() 而让 condition_variable 来完成这项工作。同样使用std::ref将函数参数传递给线程。
std::mutex mtx;
std::condition_variable cvar;
bool ready = true;
void doit(long index) {
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=] {return ready == true; });
ready = false;
std::cout << index << std::endl;
ready = true;
cvar.notify_all();
return;
}
int main()
{
long n = 50;
for (long i = 0; i < n; ++i)
std::thread(doit, std::ref(i)).detach();
std::this_thread::sleep_for(std::chrono::seconds(3));
return(0);
}
std:: unique_lock
是一个 RAII 对象。在范围内声明它,然后将您的烦恼抛到九霄云外。问题来了:在 doit 调用 mtx.unlock() 之后,偶尔 next 语句 cvar.notify_all() 会立即用(新的)next == index 唤醒线程。该线程将获取互斥锁。当 doit returns 时,锁析构函数试图释放互斥锁,但它被另一个线程持有。灾难接踵而至。操作方法如下():
void doit(long index) {
{
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=] {return index == next; });
++next;
std::cout << index << std::endl;
}
cvar.notify_all();
return;
}
我不建议分离线程,因为之后你无法加入它们。
如果你真的要做,那就用条件变量来同步下一段时间的数据。
void doit(long index){
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=]{return index == next;});
std::cout<< index << std::endl;
++next;
cvar.notify_all();
return;
}
int main()
{
long n=50;
for (long i=0; i < n; ++i)
std::thread (doit,i).detach();
//here you wait for the last thread to finish
{
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=]{return n == next;});
}
return(0);
}
如果您可以让您的线程可连接,您可以编写更简单的代码。
std::mutex mtx;
std::condition_variable cvar;
long next = 0;
void doit(long index){
std::unique_lock<std::mutex> lock(mtx);
//this guarantees the order in which are being executed
cvar.wait(lock, [=]{return index == next;});
std::cout<< index << std::endl;
++next;
cvar.notify_all();//wakes all the thread, only the one with index=next will be executed
return;
}
int main()
{
long n=50;
std::vector<std::thread> workers;
for (long i=0; i < n; ++i){
workers.emplace_back(std::thread (doit,i));
}
//this guarantees your threads are all finished at the end of this block
for (auto& t : workers) {
t.join();
}
return(0);
}
我在 linux Debian 系统上使用下面的 foo.cpp 代码:
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <thread>
std::mutex mtx;
std::condition_variable cvar;
long next = 0;
void doit(long index){
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=]{return index == next;});
std::cout<< index << std::endl;
++next;
mtx.unlock();
cvar.notify_all();
return;
}
int main()
{
long n=50;
for (long i=0; i < n; ++i)
std::thread (doit,i).detach();
while(next != n)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return(0);
}
我编译它:
g++ -std=c++14 -pthread -o foo foo.cpp
它被设计为触发 50 个线程,分离的,它们由一个互斥体控制,condition_variable 在函数 doit 中,因此它们顺序执行互斥体块。
大部分时间都在工作,将数字 00 到 49 写入屏幕,然后终止。
但是,它有两种偶然的故障模式:
故障模式 1:上升到某个小于 50 的任意数字后,它中止并出现错误:
foo: ../nptl/pthread_mutex_lock.c:80: __pthread_mutex_lock: 断言 `mutex->__data.__owner == 0' 失败。
失败模式 2:上升到小于 50 的某个任意数字后,它挂起,必须用 ctrl-C 终止才能返回到终端提示符。
对于此行为的原因以及如何修复它的任何建议,我将不胜感激。
============================================= ============================
编辑: 好的,这是一个有效的修订版。我修复了这两个错误,并将锁名称从 "lock" 更改为 "lk" 以减少混淆。感谢您的帮助。
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
std::mutex mtx;
std::condition_variable cvar;
long next = 0;
void doit(long index){
std::unique_lock<std::mutex> lk(mtx);
cvar.wait(lk, [=]{return index == next;});
std::cout<< index << std::endl;
++next;
lk.unlock();
cvar.notify_all();
return;
}
int main()
{
long n=50;
for (long i=0; i < n; ++i)
std::thread (doit,i).detach();
{
std::unique_lock<std::mutex> lk(mtx);
cvar.wait(lk, [=]{return n == next;});
}
return(0);
}
while(next != n)
尝试访问变量 next
,该变量可由工作线程修改,无需任何同步创建竞争条件。它应该被同一个互斥锁覆盖:
{
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=]{return n == next;});
}
分离线程不是一个好主意。在从 main
.
join
更新:您试图在 mutex
本身上调用 unlock
,而不是在锁定对象上调用它。通过构造锁定对象,您将解锁互斥体的责任委托给 lock
对象。应该是
lock.unlock();
cvar.notify_all();
为什么不保持简单?
int main() {
long n = 50;
std::vector<std::thread> threads;
for (long i = 0; i < n; ++i)
threads.emplace_back([=]() { std::cout << i << std::endl; });
for (const auto& t : threads) {
t.join();
}
return 0;
}
试试这个片段:你不应该使用 mtx.unlock() 而让 condition_variable 来完成这项工作。同样使用std::ref将函数参数传递给线程。
std::mutex mtx;
std::condition_variable cvar;
bool ready = true;
void doit(long index) {
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=] {return ready == true; });
ready = false;
std::cout << index << std::endl;
ready = true;
cvar.notify_all();
return;
}
int main()
{
long n = 50;
for (long i = 0; i < n; ++i)
std::thread(doit, std::ref(i)).detach();
std::this_thread::sleep_for(std::chrono::seconds(3));
return(0);
}
std:: unique_lock
是一个 RAII 对象。在范围内声明它,然后将您的烦恼抛到九霄云外。问题来了:在 doit 调用 mtx.unlock() 之后,偶尔 next 语句 cvar.notify_all() 会立即用(新的)next == index 唤醒线程。该线程将获取互斥锁。当 doit returns 时,锁析构函数试图释放互斥锁,但它被另一个线程持有。灾难接踵而至。操作方法如下():
void doit(long index) {
{
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=] {return index == next; });
++next;
std::cout << index << std::endl;
}
cvar.notify_all();
return;
}
我不建议分离线程,因为之后你无法加入它们。 如果你真的要做,那就用条件变量来同步下一段时间的数据。
void doit(long index){
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=]{return index == next;});
std::cout<< index << std::endl;
++next;
cvar.notify_all();
return;
}
int main()
{
long n=50;
for (long i=0; i < n; ++i)
std::thread (doit,i).detach();
//here you wait for the last thread to finish
{
std::unique_lock<std::mutex> lock(mtx);
cvar.wait(lock, [=]{return n == next;});
}
return(0);
}
如果您可以让您的线程可连接,您可以编写更简单的代码。
std::mutex mtx;
std::condition_variable cvar;
long next = 0;
void doit(long index){
std::unique_lock<std::mutex> lock(mtx);
//this guarantees the order in which are being executed
cvar.wait(lock, [=]{return index == next;});
std::cout<< index << std::endl;
++next;
cvar.notify_all();//wakes all the thread, only the one with index=next will be executed
return;
}
int main()
{
long n=50;
std::vector<std::thread> workers;
for (long i=0; i < n; ++i){
workers.emplace_back(std::thread (doit,i));
}
//this guarantees your threads are all finished at the end of this block
for (auto& t : workers) {
t.join();
}
return(0);
}