如何防止用餐哲学家c ++中的死锁
How to prevent deadlock in dining philosopher c++
我正在尝试解决哲学家就餐问题中的一个僵局。我已经有了我的老师提供的代码框架。
我尝试使用 try_lock()
解决问题
chopstick[(i+1)%5].try_lock();
但这并没有解决我的问题,当我运行这样做时,我确实收到了以下错误消息:
错误 "unlock of unowned mutex".
我还尝试通过在 youtube 视频中看到的以下更改来解决问题
chopstick[i].lock();
至
chopstick[min(i,(i+1)%5)].lock();
还有
chopstick[(i+1)%5].lock();
至
chopstick[max(i,(i+1)%5)].lock();
这是我得到的骨架。
#include <windows.h>
#include <stdio.h>
#include <iostream>
#include <vector>
#include <algorithm>
#include <thread>
#include <mutex>
#include <time.h>
using namespace std;
thread task[5];
mutex chopstick[5];
int stop = false;
void go(int i) {
while (!stop) {
chopstick[i].lock();
cout << i << ": takes: " << i << endl;
chrono::milliseconds dur(20);
this_thread::sleep_for(dur); //Leads to deadlock immediately
chopstick[(i + 1) % 5].lock();
cout << i << ": eating" << endl;
chrono::milliseconds dur2(rand() % 200 + 100);
this_thread::sleep_for(dur2);
chopstick[(i + 1) % 5].unlock();
chopstick[i].unlock();
}
}
int main() {
srand(time(NULL));
for (int i = 0; i < 5; ++i) {
task[i] = (thread(go, i));
}
for (int i = 0; i < 5; i++) {
task[i].join();
}
}
我在理论上理解餐饮哲学家,但我无法解决这个问题。我真的不明白我做错了什么。有人可以解释一下我做错了什么并帮我解决吗?
并行编程(使用锁)的核心规则之一是,您应该始终以相同的顺序获取锁。
在您的代码中,每个任务首先获取其锁,然后获取下一个锁。一种解决方案是始终从偶数索引获取锁,然后才从奇数索引获取锁。这样,您获取锁的顺序将保持一致。
另一个众所周知的策略是'backoff',其中您使用lock()
获取第一个锁,并使用try_lock()
获取后续锁,如果无法获取,您释放所有获得的锁并重新开始序列。这种策略在性能方面并不好,但它保证最终会起作用。
修复死锁的最简单方法是使用专为此目的而发明的std::lock(l1, l2)
。
变化:
chopstick[i].lock();
cout << i << ": takes: " << i << endl;
chrono::milliseconds dur(20);
this_thread::sleep_for(dur); //Leads to deadlock immediately
chopstick[(i + 1) % 5].lock();
至:
std::lock(chopstick[i], chopstick[(i + 1) % 5]);
cout << i << ": takes: " << i << endl;
这是一个直接的解决方案,忽略了异常安全,这对于你的第一个死锁避免课程来说很好。
为了使其异常安全,需要将互斥量包装在 RAII 设备中:std::unique_lock
:
unique_lock<mutex> left{chopstick[i], defer_lock};
unique_lock<mutex> right{chopstick[(i + 1) % 5], defer_lock};
lock(left, right);
cout << i << ": takes: " << i << endl;
然后你还应该删除显式的 unlock
语句,因为 left
和 right
的析构函数会处理这些。
现在,如果 go
中有任何异常抛出,left
和 right
的析构函数将在异常传播出去时解锁互斥量。
要了解更多关于 std::lock
幕后发生的事情以避免死锁,请参阅:http://howardhinnant.github.io/dining_philosophers.html
性能测试
这是一个快速简单的测试,用于比较 std::lock
与更传统的建议 "order your mutexes" 的使用。
#ifndef USE_STD_LOCK
# error #define USE_STD_LOCK as 1 to use std::lock and as 0 to use ordering
#endif
#include <atomic>
#include <chrono>
#include <exception>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <thread>
std::thread task[5];
constexpr auto N = sizeof(task)/sizeof(task[0]);
std::mutex chopstick[N];
std::atomic<bool> stop{false};
unsigned long long counts[N] = {};
using namespace std::chrono_literals;
void
go(decltype(N) i)
{
auto const right = (i + 1) % N;
decltype(right) const left = i;
while (!stop)
{
#if USE_STD_LOCK
std::lock(chopstick[left], chopstick[right]);
#else
if (left < right)
{
chopstick[left].lock();
chopstick[right].lock();
}
else
{
chopstick[right].lock();
chopstick[left].lock();
}
#endif
std::lock_guard<std::mutex> l1{chopstick[left], std::adopt_lock};
std::lock_guard<std::mutex> l2{chopstick[right], std::adopt_lock};
++counts[i];
std::this_thread::sleep_for(1ms);
}
}
void
deadlock_detector(std::chrono::seconds time_out)
{
std::this_thread::sleep_for(time_out);
std::cerr << "Deadlock!\n";
std::terminate();
}
int
main()
{
for (auto i = 0u; i < N; ++i)
task[i] = std::thread{go, i};
std::thread{deadlock_detector, 15s}.detach();
std::this_thread::sleep_for(10s);
stop = true;
for (auto& t : task)
t.join();
std::cout << std::right;
for (auto c : counts)
std::cout << std::setw(6) << c << '\n';
auto count = std::accumulate(std::begin(counts), std::end(counts), 0ULL);
std::cout << "+ ----\n";
std::cout << std::setw(6) << count << '\n';
}
编译时必须定义 USE_STD_LOCK
:
#define USE_STD_LOCK 0
对您的互斥量进行排序并按顺序锁定它们。
#define USE_STD_LOCK 1
用 std::lock
锁定你的互斥量。
程序 运行s 持续 10 秒,每个线程尽可能频繁地递增一个不同的 unsigned long long
。但是为了让事情变得更加戏剧化,每个线程在持有锁的同时也会休眠 1 毫秒(运行 如果你愿意,可以不休眠)。
10 秒后,main
告诉每个人轮班结束并计算每个线程的结果,以及所有线程的总增量。越高越好。
运行 优化已启用,我得到的数字如下:
USE_STD_LOCK = 1
3318
2644
3254
3004
2876
+ ----
15096
USE_STD_LOCK = 0
19
96
1641
5885
50
+ ----
7691
请注意,使用 std::lock
不仅会导致 多 更高的累积结果,而且每个线程对总数的贡献大致相同。相比之下,"ordering" 倾向于选择单线程,使其他线程处于饥饿状态,在某些情况下非常严重。
这是在 4 核英特尔酷睿 i5 上。我将差异归因于拥有多个内核,这样至少有两个线程可以同时 运行。如果这是 运行 在单个核心设备上,我不会期望这种差异(我没有测试该配置)。
我还使用死锁检测器对测试进行了检测。这不会影响我得到的结果。它旨在让人们尝试其他锁定算法,并更快地确定测试是否已锁定。如果这个死锁检测器以任何方式打扰您,只需将它从您的测试中删除 运行。我不想争论它的优点。
我欢迎您提供建设性的反馈,无论您得到类似的结果还是不同的结果。或者,如果您认为此测试存在某种偏见,以及如何改进它。
C++17 更新
在 C++17 及更高版本中,与使用 unique_lock
/lock_guard
相比,调用 std::lock
的方法更为简洁。您可以使用 scoped_lock
而不是在后台为您调用 std::lock
。即改变:
unique_lock<mutex> left{chopstick[i], defer_lock};
unique_lock<mutex> right{chopstick[(i + 1) % 5], defer_lock};
lock(left, right);
至:
scoped_lock lk{chopstick[i], chopstick[(i + 1) % 5]};
这一行在一个方便的包中为您提供 std::lock
的死锁避免和 unique_lock
/lock_guard
的异常安全.
有四 (4) 个条件需要并且足以产生死锁.
死锁条件
- 资源互斥(资源无法共享)
考虑(请求)的资源不能共享。当允许共享资源时,不会阻止(兄弟)进程在需要时获取资源。
- 资源等待或坚持(部分分配)
进程必须持有已分配的资源,并等待(尝试占用)后续(请求的)资源。当请求新资源时进程必须释放持有的资源时,不会发生死锁,因为进程不会阻止(兄弟)进程在需要时获取资源。
- 资源不允许抢占(进程平等或公平)
进程在持有时不能带走资源。否则,更高优先级(排名)的进程将简单地占用(占用)足够的资源以使进程能够完成。许多RTOS使用这种方法来防止死锁。
- 资源循环顺序或等待(资源图中存在循环)
资源存在循环排序或循环(链),资源不能按部分顺序排列(编号最小..最大)。当可以对资源施加部分顺序时,可以根据该严格顺序获取(锁定)资源,并且不会发生死锁(请参阅循环定理,其中指出 "cycle in a resource graph is necessary so that deadlock can occur")。
哲学家就餐问题(通过实验)构建为呈现所有四个条件,挑战 是决定要避免(打破)哪些条件。一个 经典 答案是更改资源的顺序以打破 循环等待 条件。每个哲学家独立决定如何解决死锁。
- 可共享 - 每个哲学家需要两个叉子,不能共享。
- 坚持不懈 - 每个哲学家必须在拿另一把叉子的同时保留一把叉子。
- 没有抢占 - 没有哲学家会从另一个人那里拿叉子。
- Circular order - 有一个循环,所以两个哲学家碰撞并陷入僵局。
有几个众所周知的解决方案:
- Djikstra 的解决方案 - 为叉子编号 (1 .. N),所有哲学家都按照以下规则取叉子:取编号较低的叉子,然后取编号较高的叉子,并在碰撞时释放所有资源。
- Arbitrator (monitor) - 仲裁员分配叉子,当哲学家想吃饭时,他们询问仲裁员,仲裁员将他们的请求序列化,并在可用时分配(分发)资源(叉子)。
Djikstra 是规范的解决方案 - 为您的叉子编号。
我正在尝试解决哲学家就餐问题中的一个僵局。我已经有了我的老师提供的代码框架。
我尝试使用 try_lock()
解决问题chopstick[(i+1)%5].try_lock();
但这并没有解决我的问题,当我运行这样做时,我确实收到了以下错误消息: 错误 "unlock of unowned mutex".
我还尝试通过在 youtube 视频中看到的以下更改来解决问题
chopstick[i].lock();
至
chopstick[min(i,(i+1)%5)].lock();
还有
chopstick[(i+1)%5].lock();
至
chopstick[max(i,(i+1)%5)].lock();
这是我得到的骨架。
#include <windows.h>
#include <stdio.h>
#include <iostream>
#include <vector>
#include <algorithm>
#include <thread>
#include <mutex>
#include <time.h>
using namespace std;
thread task[5];
mutex chopstick[5];
int stop = false;
void go(int i) {
while (!stop) {
chopstick[i].lock();
cout << i << ": takes: " << i << endl;
chrono::milliseconds dur(20);
this_thread::sleep_for(dur); //Leads to deadlock immediately
chopstick[(i + 1) % 5].lock();
cout << i << ": eating" << endl;
chrono::milliseconds dur2(rand() % 200 + 100);
this_thread::sleep_for(dur2);
chopstick[(i + 1) % 5].unlock();
chopstick[i].unlock();
}
}
int main() {
srand(time(NULL));
for (int i = 0; i < 5; ++i) {
task[i] = (thread(go, i));
}
for (int i = 0; i < 5; i++) {
task[i].join();
}
}
我在理论上理解餐饮哲学家,但我无法解决这个问题。我真的不明白我做错了什么。有人可以解释一下我做错了什么并帮我解决吗?
并行编程(使用锁)的核心规则之一是,您应该始终以相同的顺序获取锁。
在您的代码中,每个任务首先获取其锁,然后获取下一个锁。一种解决方案是始终从偶数索引获取锁,然后才从奇数索引获取锁。这样,您获取锁的顺序将保持一致。
另一个众所周知的策略是'backoff',其中您使用lock()
获取第一个锁,并使用try_lock()
获取后续锁,如果无法获取,您释放所有获得的锁并重新开始序列。这种策略在性能方面并不好,但它保证最终会起作用。
修复死锁的最简单方法是使用专为此目的而发明的std::lock(l1, l2)
。
变化:
chopstick[i].lock();
cout << i << ": takes: " << i << endl;
chrono::milliseconds dur(20);
this_thread::sleep_for(dur); //Leads to deadlock immediately
chopstick[(i + 1) % 5].lock();
至:
std::lock(chopstick[i], chopstick[(i + 1) % 5]);
cout << i << ": takes: " << i << endl;
这是一个直接的解决方案,忽略了异常安全,这对于你的第一个死锁避免课程来说很好。
为了使其异常安全,需要将互斥量包装在 RAII 设备中:std::unique_lock
:
unique_lock<mutex> left{chopstick[i], defer_lock};
unique_lock<mutex> right{chopstick[(i + 1) % 5], defer_lock};
lock(left, right);
cout << i << ": takes: " << i << endl;
然后你还应该删除显式的 unlock
语句,因为 left
和 right
的析构函数会处理这些。
现在,如果 go
中有任何异常抛出,left
和 right
的析构函数将在异常传播出去时解锁互斥量。
要了解更多关于 std::lock
幕后发生的事情以避免死锁,请参阅:http://howardhinnant.github.io/dining_philosophers.html
性能测试
这是一个快速简单的测试,用于比较 std::lock
与更传统的建议 "order your mutexes" 的使用。
#ifndef USE_STD_LOCK
# error #define USE_STD_LOCK as 1 to use std::lock and as 0 to use ordering
#endif
#include <atomic>
#include <chrono>
#include <exception>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <thread>
std::thread task[5];
constexpr auto N = sizeof(task)/sizeof(task[0]);
std::mutex chopstick[N];
std::atomic<bool> stop{false};
unsigned long long counts[N] = {};
using namespace std::chrono_literals;
void
go(decltype(N) i)
{
auto const right = (i + 1) % N;
decltype(right) const left = i;
while (!stop)
{
#if USE_STD_LOCK
std::lock(chopstick[left], chopstick[right]);
#else
if (left < right)
{
chopstick[left].lock();
chopstick[right].lock();
}
else
{
chopstick[right].lock();
chopstick[left].lock();
}
#endif
std::lock_guard<std::mutex> l1{chopstick[left], std::adopt_lock};
std::lock_guard<std::mutex> l2{chopstick[right], std::adopt_lock};
++counts[i];
std::this_thread::sleep_for(1ms);
}
}
void
deadlock_detector(std::chrono::seconds time_out)
{
std::this_thread::sleep_for(time_out);
std::cerr << "Deadlock!\n";
std::terminate();
}
int
main()
{
for (auto i = 0u; i < N; ++i)
task[i] = std::thread{go, i};
std::thread{deadlock_detector, 15s}.detach();
std::this_thread::sleep_for(10s);
stop = true;
for (auto& t : task)
t.join();
std::cout << std::right;
for (auto c : counts)
std::cout << std::setw(6) << c << '\n';
auto count = std::accumulate(std::begin(counts), std::end(counts), 0ULL);
std::cout << "+ ----\n";
std::cout << std::setw(6) << count << '\n';
}
编译时必须定义 USE_STD_LOCK
:
#define USE_STD_LOCK 0
对您的互斥量进行排序并按顺序锁定它们。#define USE_STD_LOCK 1
用std::lock
锁定你的互斥量。
程序 运行s 持续 10 秒,每个线程尽可能频繁地递增一个不同的 unsigned long long
。但是为了让事情变得更加戏剧化,每个线程在持有锁的同时也会休眠 1 毫秒(运行 如果你愿意,可以不休眠)。
10 秒后,main
告诉每个人轮班结束并计算每个线程的结果,以及所有线程的总增量。越高越好。
运行 优化已启用,我得到的数字如下:
USE_STD_LOCK = 1
3318
2644
3254
3004
2876
+ ----
15096
USE_STD_LOCK = 0
19
96
1641
5885
50
+ ----
7691
请注意,使用 std::lock
不仅会导致 多 更高的累积结果,而且每个线程对总数的贡献大致相同。相比之下,"ordering" 倾向于选择单线程,使其他线程处于饥饿状态,在某些情况下非常严重。
这是在 4 核英特尔酷睿 i5 上。我将差异归因于拥有多个内核,这样至少有两个线程可以同时 运行。如果这是 运行 在单个核心设备上,我不会期望这种差异(我没有测试该配置)。
我还使用死锁检测器对测试进行了检测。这不会影响我得到的结果。它旨在让人们尝试其他锁定算法,并更快地确定测试是否已锁定。如果这个死锁检测器以任何方式打扰您,只需将它从您的测试中删除 运行。我不想争论它的优点。
我欢迎您提供建设性的反馈,无论您得到类似的结果还是不同的结果。或者,如果您认为此测试存在某种偏见,以及如何改进它。
C++17 更新
在 C++17 及更高版本中,与使用 unique_lock
/lock_guard
相比,调用 std::lock
的方法更为简洁。您可以使用 scoped_lock
而不是在后台为您调用 std::lock
。即改变:
unique_lock<mutex> left{chopstick[i], defer_lock};
unique_lock<mutex> right{chopstick[(i + 1) % 5], defer_lock};
lock(left, right);
至:
scoped_lock lk{chopstick[i], chopstick[(i + 1) % 5]};
这一行在一个方便的包中为您提供 std::lock
的死锁避免和 unique_lock
/lock_guard
的异常安全.
有四 (4) 个条件需要并且足以产生死锁.
死锁条件
- 资源互斥(资源无法共享)
考虑(请求)的资源不能共享。当允许共享资源时,不会阻止(兄弟)进程在需要时获取资源。
- 资源等待或坚持(部分分配)
进程必须持有已分配的资源,并等待(尝试占用)后续(请求的)资源。当请求新资源时进程必须释放持有的资源时,不会发生死锁,因为进程不会阻止(兄弟)进程在需要时获取资源。
- 资源不允许抢占(进程平等或公平)
进程在持有时不能带走资源。否则,更高优先级(排名)的进程将简单地占用(占用)足够的资源以使进程能够完成。许多RTOS使用这种方法来防止死锁。
- 资源循环顺序或等待(资源图中存在循环)
资源存在循环排序或循环(链),资源不能按部分顺序排列(编号最小..最大)。当可以对资源施加部分顺序时,可以根据该严格顺序获取(锁定)资源,并且不会发生死锁(请参阅循环定理,其中指出 "cycle in a resource graph is necessary so that deadlock can occur")。
哲学家就餐问题(通过实验)构建为呈现所有四个条件,挑战 是决定要避免(打破)哪些条件。一个 经典 答案是更改资源的顺序以打破 循环等待 条件。每个哲学家独立决定如何解决死锁。
- 可共享 - 每个哲学家需要两个叉子,不能共享。
- 坚持不懈 - 每个哲学家必须在拿另一把叉子的同时保留一把叉子。
- 没有抢占 - 没有哲学家会从另一个人那里拿叉子。
- Circular order - 有一个循环,所以两个哲学家碰撞并陷入僵局。
有几个众所周知的解决方案:
- Djikstra 的解决方案 - 为叉子编号 (1 .. N),所有哲学家都按照以下规则取叉子:取编号较低的叉子,然后取编号较高的叉子,并在碰撞时释放所有资源。
- Arbitrator (monitor) - 仲裁员分配叉子,当哲学家想吃饭时,他们询问仲裁员,仲裁员将他们的请求序列化,并在可用时分配(分发)资源(叉子)。
Djikstra 是规范的解决方案 - 为您的叉子编号。