使用 std::thread 时主线程中的循环卡住
While loop in main thread is getting stuck when using std::thread
我有一个简单的 C++ 代码来测试和理解线程。代码有主线程+副线程。
辅助更新主线程循环所依赖的变量的值。当我在主循环中添加打印语句时,程序成功完成,但是当我删除该打印语句时,它会进入无限循环。
这是我正在使用的代码,我指的打印语句是打印语句 2
#include <mpi.h>
#include <iostream>
#include <fstream>
#include <thread>
#include <mutex>
std::mutex mu;
int num;
using namespace std;
void WorkerFunction()
{
bool work = true;
while(work)
{
mu.lock();
num --;
mu.unlock();
if(num == 1)
work = false;
}
}
int main(int argc, char **argv)
{
bool work = true;
num = 10;
int numRanks, myRank, provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided);
MPI_Comm_size(MPI_COMM_WORLD, &numRanks);
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
std::thread workThread (WorkerFunction);
//print statement 1
cerr<<"Rank "<<myRank<<" Started workThread \n";
int mult = 0;
while(work)
{
mult += mult * num;
//print statement 2
if(myRank == 0) cerr<<"num = "<<num<<"\n";
if(num == 1)
work = false;
}
if(work == false)
workThread.join();
//print statement 3
cerr<<"Rank "<<myRank<<" Done with both threads \n";
MPI_Finalize();
};
这是我在打印语句 2 时得到的输出
mpirun -np 4 ./Testing
Rank 0 Started workThread
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
Rank 1 Started workThread
Rank 0 Done with both threads
Rank 1 Done with both threads
Rank 2 Started workThread
Rank 3 Started workThread
Rank 2 Done with both threads
Rank 3 Done with both threads
如果我注释掉那个 print 语句,它就会进入无限循环,这就是我得到的输出
mpirun -np 4 ./Testing
Rank 0 Started workThread
Rank 0 Done with both threads
Rank 1 Started workThread
Rank 2 Started workThread
Rank 3 Started workThread
Rank 2 Done with both threads
Rank 3 Done with both threads
我不确定我做错了什么,感谢您的帮助。
关于MPI,我没有任何经验。 (我几十年前就用过它,我确信这个事实完全没有价值。)但是,OP 声称
I have a simple C++ code to test and understand threading.
考虑到多处理(MPI
)和多线程(std::thread
)本身就是复杂的主题,我会先把主题分开,然后再把它们放在一起在每个方面都获得了一些经验。
所以,我详细说明了多线程(我觉得可以)。
第一个示例是 OP 代码的修订版(删除了所有对 MPI
的引用):
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex mtxNum;
int num;
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
mtxNum.lock();
num_ = --num;
mtxNum.unlock();
work = num_ != 1;
}
}
int main()
{
num = 10;
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
mtxNum.lock();
num_ = num;
mtxNum.unlock();
std::cout << "num: " << num_ << '\n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.\n";
}
输出:
num: 10
num: 8
num: 7
num: 6
num: 5
num: 4
num: 3
num: 2
num: 2
num: 1
Both threads done.
备注:
而多线程是运行,变量num
是共享的,变量num
至少在一个线程中被修改,所以每次访问都要放到a critical section(一对互斥锁和解锁)。
临界区应始终保持尽可能短。 (一次只有一个线程可以通过临界区。因此,它引入了re-serialization,它消耗了speed-up,并发。)我在每个线程中引入了一个局部变量num_
来复制当前共享变量的值并在相应线程的临界区之后使用它。*
为了更好地说明,我在两个线程中添加了 sleep_for()
。没有,我得到了
num: 10
num: 1
Both threads done.
我觉得有点无聊。
输出跳过num == 9
并打印num == 2
两次。 (这在其他运行中可能看起来不同。)原因是线程根据定义异步工作。 (两个线程中 100 毫秒的相等延迟不是可靠的同步。)OS 负责在没有任何东西(例如锁定的互斥锁)阻止时唤醒线程。可以随时挂起线程。
关于mtxNum.lock()
/mtxNum.unlock()
:假设临界区包含比简单的--num;
更复杂的东西,可能会抛出异常。如果抛出异常,则跳过 mtxNum.unlock()
,并生成 deadlock 以阻止任何线程继续进行。
为此,std
库提供了一个非常方便的工具:std::lock_guard
:
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex mtxNum;
int num;
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ std::lock_guard<std::mutex> lock(mtxNum); // does the mtxNum.lock()
num_ = --num;
} // destructor of lock does the mtxNum.unlock()
work = num_ != 1;
}
}
int main()
{
num = 10;
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ std::lock_guard<std::mutex> lock(mtxNum); // does the mtxNum.lock()
num_ = num;
} // destructor of lock does the mtxNum.unlock()
std::cout << "num: " << num_ << '\n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.\n";
}
输出:
num: 10
num: 8
num: 7
num: 6
num: 5
num: 4
num: 3
num: 2
num: 1
Both threads done.
std::lock_guard
的诀窍是析构函数在任何情况下都会解锁互斥锁,即使在临界区内抛出异常也是如此。
可能是,我有点偏执,但令我恼火的是 non-guarded 访问共享变量可能是偶然发生的,而没有在任何调试会话或任何编译器诊断中被注意到。 ** 因此,将共享变量隐藏到 class 中可能是值得的,只有锁定它才能访问。为此,我在示例中引入了Shared
:
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
template <typename T>
class Shared {
public:
struct Lock {
Shared &shared;
std::lock_guard<std::mutex> lock;
Lock(Shared &shared): shared(shared), lock(shared._mtx) { }
~Lock() = default;
Lock(const Lock&) = delete;
Lock& operator=(const Lock&) = delete;
const T& get() const { return shared._value; }
T& get() { return shared._value; }
};
private:
std::mutex _mtx;
T _value;
public:
Shared() = default;
explicit Shared(T &&value): _value(std::move(value)) { }
~Shared() = default;
Shared(const Shared&) = delete;
Shared& operator=(const Shared&) = delete;
};
typedef Shared<int> SharedInt;
SharedInt shNum(10);
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ SharedInt::Lock lock(shNum);
num_ = --lock.get();
}
work = num_ != 1;
}
}
int main()
{
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ const SharedInt::Lock lock(shNum);
num_ = lock.get();
}
std::cout << "num: " << num_ << '\n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.\n";
}
输出:与以前类似。
诀窍是可以从 Shared::Lock
实例中检索对共享值的引用 → 即当它被锁定时。即使存储引用:
{ SharedInt::Lock lock(shNum);
int &num = lock.get();
num_ = --num;
}
int &num
的生命周期刚好在 SharedInt::Lock lock(shNum);
的生命周期之前结束。
当然,人们可以获得指向 num
的指针以在范围之外使用它,但我认为这是破坏行为。
还有一件事,我想提的是std::atomic
:
The atomic library provides components for fine-grained atomic operations allowing for lockless concurrent programming. Each atomic operation is indivisible with regards to any other atomic operation that involves the same object.
虽然互斥锁可能是 OS 内核函数的主题,但原子访问可能会利用 CPU 功能完成,而无需进入内核。 (这可能会提供 speed-up 并减少对 OS 资源的使用。)
更好的是,如果没有 H/W 支持 resp。 type available 它回退到基于 互斥锁或其他锁定操作 的实现(根据 std::atomic<T>::is_lock_free()
中的注释):
All atomic types except for std::atomic_flag may be implemented using mutexes or other locking operations, rather than using the lock-free atomic CPU instructions. Atomic types are also allowed to be sometimes lock-free, e.g. if only aligned memory accesses are naturally atomic on a given architecture, misaligned objects of the same type have to use locks.
std::atomic
修改后的样本:
#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>
std::atomic<int> num;
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
work = --num != 1;
}
}
int main()
{
num = 10;
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
const int num_ = num;
std::cout << "num: " << num_ << '\n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.\n";
}
输出:
num: 10
num: 8
num: 7
num: 7
num: 5
num: 4
num: 3
num: 3
num: 1
Both threads done.
* 我在 WorkingThread()
上沉思了一会儿。如果它是修改 num
的唯一线程,那么在关键部分之外对 num
(在 WorkingThread()
中)的读取访问应该是安全的——我相信。但是,至少,为了可维护性,我不会这样做。
** 根据我的个人经验,此类错误很少(或从不)出现在调试会话中,而是出现在向客户演示的前 180 秒内。
我有一个简单的 C++ 代码来测试和理解线程。代码有主线程+副线程。 辅助更新主线程循环所依赖的变量的值。当我在主循环中添加打印语句时,程序成功完成,但是当我删除该打印语句时,它会进入无限循环。 这是我正在使用的代码,我指的打印语句是打印语句 2
#include <mpi.h>
#include <iostream>
#include <fstream>
#include <thread>
#include <mutex>
std::mutex mu;
int num;
using namespace std;
void WorkerFunction()
{
bool work = true;
while(work)
{
mu.lock();
num --;
mu.unlock();
if(num == 1)
work = false;
}
}
int main(int argc, char **argv)
{
bool work = true;
num = 10;
int numRanks, myRank, provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided);
MPI_Comm_size(MPI_COMM_WORLD, &numRanks);
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
std::thread workThread (WorkerFunction);
//print statement 1
cerr<<"Rank "<<myRank<<" Started workThread \n";
int mult = 0;
while(work)
{
mult += mult * num;
//print statement 2
if(myRank == 0) cerr<<"num = "<<num<<"\n";
if(num == 1)
work = false;
}
if(work == false)
workThread.join();
//print statement 3
cerr<<"Rank "<<myRank<<" Done with both threads \n";
MPI_Finalize();
};
这是我在打印语句 2 时得到的输出
mpirun -np 4 ./Testing
Rank 0 Started workThread
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
Rank 1 Started workThread
Rank 0 Done with both threads
Rank 1 Done with both threads
Rank 2 Started workThread
Rank 3 Started workThread
Rank 2 Done with both threads
Rank 3 Done with both threads
如果我注释掉那个 print 语句,它就会进入无限循环,这就是我得到的输出
mpirun -np 4 ./Testing
Rank 0 Started workThread
Rank 0 Done with both threads
Rank 1 Started workThread
Rank 2 Started workThread
Rank 3 Started workThread
Rank 2 Done with both threads
Rank 3 Done with both threads
我不确定我做错了什么,感谢您的帮助。
关于MPI,我没有任何经验。 (我几十年前就用过它,我确信这个事实完全没有价值。)但是,OP 声称
I have a simple C++ code to test and understand threading.
考虑到多处理(MPI
)和多线程(std::thread
)本身就是复杂的主题,我会先把主题分开,然后再把它们放在一起在每个方面都获得了一些经验。
所以,我详细说明了多线程(我觉得可以)。
第一个示例是 OP 代码的修订版(删除了所有对 MPI
的引用):
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex mtxNum;
int num;
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
mtxNum.lock();
num_ = --num;
mtxNum.unlock();
work = num_ != 1;
}
}
int main()
{
num = 10;
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
mtxNum.lock();
num_ = num;
mtxNum.unlock();
std::cout << "num: " << num_ << '\n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.\n";
}
输出:
num: 10
num: 8
num: 7
num: 6
num: 5
num: 4
num: 3
num: 2
num: 2
num: 1
Both threads done.
备注:
而多线程是运行,变量
num
是共享的,变量num
至少在一个线程中被修改,所以每次访问都要放到a critical section(一对互斥锁和解锁)。临界区应始终保持尽可能短。 (一次只有一个线程可以通过临界区。因此,它引入了re-serialization,它消耗了speed-up,并发。)我在每个线程中引入了一个局部变量
num_
来复制当前共享变量的值并在相应线程的临界区之后使用它。*为了更好地说明,我在两个线程中添加了
sleep_for()
。没有,我得到了num: 10 num: 1 Both threads done.
我觉得有点无聊。
输出跳过
num == 9
并打印num == 2
两次。 (这在其他运行中可能看起来不同。)原因是线程根据定义异步工作。 (两个线程中 100 毫秒的相等延迟不是可靠的同步。)OS 负责在没有任何东西(例如锁定的互斥锁)阻止时唤醒线程。可以随时挂起线程。
关于mtxNum.lock()
/mtxNum.unlock()
:假设临界区包含比简单的--num;
更复杂的东西,可能会抛出异常。如果抛出异常,则跳过 mtxNum.unlock()
,并生成 deadlock 以阻止任何线程继续进行。
为此,std
库提供了一个非常方便的工具:std::lock_guard
:
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex mtxNum;
int num;
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ std::lock_guard<std::mutex> lock(mtxNum); // does the mtxNum.lock()
num_ = --num;
} // destructor of lock does the mtxNum.unlock()
work = num_ != 1;
}
}
int main()
{
num = 10;
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ std::lock_guard<std::mutex> lock(mtxNum); // does the mtxNum.lock()
num_ = num;
} // destructor of lock does the mtxNum.unlock()
std::cout << "num: " << num_ << '\n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.\n";
}
输出:
num: 10
num: 8
num: 7
num: 6
num: 5
num: 4
num: 3
num: 2
num: 1
Both threads done.
std::lock_guard
的诀窍是析构函数在任何情况下都会解锁互斥锁,即使在临界区内抛出异常也是如此。
可能是,我有点偏执,但令我恼火的是 non-guarded 访问共享变量可能是偶然发生的,而没有在任何调试会话或任何编译器诊断中被注意到。 ** 因此,将共享变量隐藏到 class 中可能是值得的,只有锁定它才能访问。为此,我在示例中引入了Shared
:
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
template <typename T>
class Shared {
public:
struct Lock {
Shared &shared;
std::lock_guard<std::mutex> lock;
Lock(Shared &shared): shared(shared), lock(shared._mtx) { }
~Lock() = default;
Lock(const Lock&) = delete;
Lock& operator=(const Lock&) = delete;
const T& get() const { return shared._value; }
T& get() { return shared._value; }
};
private:
std::mutex _mtx;
T _value;
public:
Shared() = default;
explicit Shared(T &&value): _value(std::move(value)) { }
~Shared() = default;
Shared(const Shared&) = delete;
Shared& operator=(const Shared&) = delete;
};
typedef Shared<int> SharedInt;
SharedInt shNum(10);
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ SharedInt::Lock lock(shNum);
num_ = --lock.get();
}
work = num_ != 1;
}
}
int main()
{
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ const SharedInt::Lock lock(shNum);
num_ = lock.get();
}
std::cout << "num: " << num_ << '\n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.\n";
}
输出:与以前类似。
诀窍是可以从 Shared::Lock
实例中检索对共享值的引用 → 即当它被锁定时。即使存储引用:
{ SharedInt::Lock lock(shNum);
int &num = lock.get();
num_ = --num;
}
int &num
的生命周期刚好在 SharedInt::Lock lock(shNum);
的生命周期之前结束。
当然,人们可以获得指向 num
的指针以在范围之外使用它,但我认为这是破坏行为。
还有一件事,我想提的是std::atomic
:
The atomic library provides components for fine-grained atomic operations allowing for lockless concurrent programming. Each atomic operation is indivisible with regards to any other atomic operation that involves the same object.
虽然互斥锁可能是 OS 内核函数的主题,但原子访问可能会利用 CPU 功能完成,而无需进入内核。 (这可能会提供 speed-up 并减少对 OS 资源的使用。)
更好的是,如果没有 H/W 支持 resp。 type available 它回退到基于 互斥锁或其他锁定操作 的实现(根据 std::atomic<T>::is_lock_free()
中的注释):
All atomic types except for std::atomic_flag may be implemented using mutexes or other locking operations, rather than using the lock-free atomic CPU instructions. Atomic types are also allowed to be sometimes lock-free, e.g. if only aligned memory accesses are naturally atomic on a given architecture, misaligned objects of the same type have to use locks.
std::atomic
修改后的样本:
#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>
std::atomic<int> num;
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
work = --num != 1;
}
}
int main()
{
num = 10;
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
const int num_ = num;
std::cout << "num: " << num_ << '\n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.\n";
}
输出:
num: 10
num: 8
num: 7
num: 7
num: 5
num: 4
num: 3
num: 3
num: 1
Both threads done.
* 我在 WorkingThread()
上沉思了一会儿。如果它是修改 num
的唯一线程,那么在关键部分之外对 num
(在 WorkingThread()
中)的读取访问应该是安全的——我相信。但是,至少,为了可维护性,我不会这样做。
** 根据我的个人经验,此类错误很少(或从不)出现在调试会话中,而是出现在向客户演示的前 180 秒内。