通过排序 std::mutex 避免死锁
Deadlock avoidance by ordering std::mutex
这里是否有死锁避免逻辑的可移植实现(请参阅标记为“不可移植”的部分):
#include <cstdint>
#include <iostream>
#include <mutex>
#include <thread>
typedef long Money; //In minor unit.
class Account {
public:
bool transfer(Account& to,const Money amount);
Money get_balance() const;
Account(const Money deposit=0) : balance{deposit} {}
private:
mutable std::mutex lock;
Money balance;
};
bool Account::transfer(Account& to,const Money amount){
std::unique_lock<decltype(this->lock)> flock{this->lock,std::defer_lock};
std::unique_lock<decltype(to.lock)> tlock{to.lock,std::defer_lock};
//NON-PORTABLE:BEGIN: using intptr_t AND assuming Total Strict Order.
const auto fi{reinterpret_cast<const std::intptr_t>(static_cast<const void*>(&this->lock))};
const auto ti{reinterpret_cast<const std::intptr_t>(static_cast<const void*>(&to.lock))};
if(fi<ti){
flock.lock();
tlock.lock();
} else if (fi!=ti) {
tlock.lock();
flock.lock();
} else {
flock.lock();
}
//NON-PORTABLE:END
this->balance-=amount;
to.balance+=amount;
return true;
}
Money Account::get_balance() const{
const std::lock_guard<decltype(this->lock)> guard{this->lock};
return this->balance;
}
void hammer_transfer(Account& from,Account& to,const Money amount, const int tries){
for(int i{1};i<=tries;++i){
from.transfer(to,amount);
}
}
int main() {
constexpr Money open_a{ 200000L};
constexpr Money open_b{ 100000L};
constexpr Money tran_ab{10};
constexpr Money tran_ba{3};
constexpr Money tran_aa{7};
Account A{open_a};
Account B{open_b};
std::cout << "A Open:" << A.get_balance() << '\n';
std::cout << "B Open:" << B.get_balance() << '\n';
constexpr long tries{20000};
std::thread TAB{hammer_transfer,std::ref(A),std::ref(B),tran_ab,tries};
std::thread TBA{hammer_transfer,std::ref(B),std::ref(A),tran_ba,tries};
std::thread TAA{hammer_transfer,std::ref(A),std::ref(A),tran_aa,tries};
TAB.join();
TBA.join();
TAA.join();
const auto close_a{A.get_balance()};
const auto close_b{B.get_balance()};
std::cout << "A Close:" << close_a<< '\n';
std::cout << "B Close:" << close_b<< '\n';
int errors{0};
if((close_a+close_b)!=(open_a+open_b)){
std::cout << "ERROR: Money Leaked!\n";
++errors;
}
if(close_a!=(open_a+tries*(tran_ba-tran_ab)) ||
close_b!=(open_b+tries*(tran_ab-tran_ba))
){
std::cout << "ERROR: 'Lost' Transaction(s)\n";
++errors;
}
if(errors==0){
std::cout << "* SUCCESS *\n";
}else{
std::cout << "** FAILED **\n";
}
std::cout << std::endl;
return 0;
}
可在此处运行:https://ideone.com/hAUfhM
假设是(而且我相信足够 - 有人吗?)intptr_t
存在并且 intptr_t
上的关系运算符暗示对它们表示的指针值的完全严格排序。
假定的排序无法保证,并且可能比指针排序的不可移植性更不便携(例如,如果 intptr_t
比指针宽并且并非所有位都被写入)。
我知道这个设计和其他设计有一些不同的即兴演奏。
我会赞成所有好的答案,即使不是可移植的,这些答案确定了他们对实现的假设,理想情况下是一个他们应用的平台,最好是一个他们不应用的平台!
std::lock() 具有内置的死锁避免算法。
一旦开始出现锁争用,您就无法使用此方法,需要重新考虑整个解决方案。几乎所有的锁都会导致上下文切换,每次都会花费大约 20000 个周期。
通常大多数账户有很多收入(商店、安排)或支出(养老金、救济金等)
一旦你确定了竞争账户,你可以排队大量交易,然后锁定满足账户,运行 交易由 try_lock 另一个账户,如果锁定成功交易已经完成了。尝试 try_lock 几次,然后使用两个锁执行 scope_lock 以获取这两个锁的所有事务。
第 2 部分。
我如何确保我的锁的安全排序,因为比较不在同一区域的指针是 UB。
您为帐户添加一个唯一 ID 并以此进行比较!
tl;dr - 您可以在 C++20 中进行可移植的原始指针比较。我可能会将该代码包装成 scoped_ordered_lock
之类的东西,因为代码仍然有点毛茸茸。
The assumptions are (and I believe sufficient – anyone?) that intptr_t exists and that the relational operators on intptr_t imply a Total Strict Ordering on values when holding values cast from valid non-null pointers to std::mutex.
不准确。你做总是对整数值有一个完全严格的顺序。当从 intptr_t
到指针的映射是 many-to-one 时会出现问题(对于分段地址示例 就是这种情况 - 即 intptr_t
上的 TSO 是不够的)。
指向intptr_t
映射的指针也必须是单射的(不一定是双射,因为我们不关心某些intptr_t
值是否是unused/don' t 表示有效指针)。
无论如何,很明显指针的完全严格排序 可以 存在:它只是 implementation-specific。分段地址可以归一化或扁平化等
幸运的是,提供了一个合适的 implementation-defined 完全严格排序:通过 C++20 中的 3-way 函子 std::compare_three_way
和 2-way 函子 less
, greater
等在 C++20 之前(并且 可能 也在 C++20 中)。
没有关于 implementation-defined strict total order over pointers in the text about the spaceship operator 的等效语言 - 即使 compare_three_way
被描述为调用它 - 或关于其他关系运算符。
这似乎是故意的,因此内置运算符 <
、>
、<=
、>=
和 <=>
不会获得在某些平台上可能代价高昂的新约束。事实上,双向关系运算符被明确描述为指针上的 partial order。
因此,这应该与您的原始代码相同,除了可移植的:
const auto order = std::compare_three_way{}(&this->lock, &to.lock);
if(order == std::strong_ordering::less){
flock.lock();
tlock.lock();
} else if (order == std::strong_ordering::greater) {
tlock.lock();
flock.lock();
} else {
flock.lock();
}
备注
C++20(特别是 PDF:P1961R0), [comparisons.general] 说
For templates less
, greater
, less_equal
, and greater_equal
, the specializations for any pointer type yield a result consistent with the implementation-defined strict total order over pointers
这是一个较弱的要求,允许他们提供部分订单,只要它永远不会与总订单不一致。这是否是故意削弱并不明显,或者只是想说他们必须执行其他地方定义的 相同 总订单。
在 C++20 之前 less
等 did 需要对这些进行总排序仿函数。
无论如何,如果您无法访问 C++20 和 compare_three_way
,您的 less
等 保证提供您需要的总订购量。只是不要依赖原始关系运算符。
这是一个展示修改后代码的自我回答。功劳归功于上面接受的答案。
对我来说,学习是因为 C++14 std::less
、std::greater
等在指针上定义了一个 Strict Total,它与已经由 <
和 [=14 定义的偏序一致=]等
通过使用这些模板,此代码现在可以保证没有死锁。
在 C++20 中,使用 std::compare_three_way<>
.
可以使它更整洁并且可能更快
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
typedef long Money; //In minor unit.
class Account {
public:
bool transfer(Account& to,const Money amount);
Money get_balance() const;
Account(const Money deposit=0) : balance{deposit} {}
private:
mutable std::mutex lock;
Money balance;
};
namespace{
std::less<void*> less{};
std::equal_to<void*> equal_to{};
}
bool Account::transfer(Account& to,const Money amount){
std::unique_lock<decltype(this->lock)> flock{this->lock,std::defer_lock};
std::unique_lock<decltype(to.lock)> tlock{to.lock,std::defer_lock};
if(less(&this->lock,&to.lock)){
flock.lock();
tlock.lock();
} else if(equal_to(&this->lock,&to.lock)) {
flock.lock();
} else {
tlock.lock();
flock.lock();
}
this->balance-=amount;
to.balance+=amount;
return true;
}
Money Account::get_balance() const{
const std::lock_guard<decltype(this->lock)> guard{this->lock};
return this->balance;
}
void hammer_transfer(Account& from,Account& to,const Money amount, const int tries){
for(int i{1};i<=tries;++i){
from.transfer(to,amount);
}
}
int main() {
constexpr Money open_a{ 200000L};
constexpr Money open_b{ 100000L};
constexpr Money tran_ab{10};
constexpr Money tran_ba{3};
constexpr Money tran_aa{7};
Account A{open_a};
Account B{open_b};
std::cout << "A Open:" << A.get_balance() << '\n';
std::cout << "B Open:" << B.get_balance() << '\n';
constexpr long tries{20000};
std::thread TAB{hammer_transfer,std::ref(A),std::ref(B),tran_ab,tries};
std::thread TBA{hammer_transfer,std::ref(B),std::ref(A),tran_ba,tries};
std::thread TAA{hammer_transfer,std::ref(A),std::ref(A),tran_aa,tries};
TAB.join();
TBA.join();
TAA.join();
const auto close_a{A.get_balance()};
const auto close_b{B.get_balance()};
std::cout << "A Close:" << close_a<< '\n';
std::cout << "B Close:" << close_b<< '\n';
int errors{0};
if((close_a+close_b)!=(open_a+open_b)){
std::cout << "ERROR: Money Leaked!\n";
++errors;
}
if(close_a!=(open_a+tries*(tran_ba-tran_ab)) ||
close_b!=(open_b+tries*(tran_ab-tran_ba))
){
std::cout << "ERROR: 'Lost' Transaction(s)\n";
++errors;
}
if(errors==0){
std::cout << "* SUCCESS *\n";
}else{
std::cout << "** FAILED **\n";
}
std::cout << std::endl;
return 0;
}
这里是否有死锁避免逻辑的可移植实现(请参阅标记为“不可移植”的部分):
#include <cstdint>
#include <iostream>
#include <mutex>
#include <thread>
typedef long Money; //In minor unit.
class Account {
public:
bool transfer(Account& to,const Money amount);
Money get_balance() const;
Account(const Money deposit=0) : balance{deposit} {}
private:
mutable std::mutex lock;
Money balance;
};
bool Account::transfer(Account& to,const Money amount){
std::unique_lock<decltype(this->lock)> flock{this->lock,std::defer_lock};
std::unique_lock<decltype(to.lock)> tlock{to.lock,std::defer_lock};
//NON-PORTABLE:BEGIN: using intptr_t AND assuming Total Strict Order.
const auto fi{reinterpret_cast<const std::intptr_t>(static_cast<const void*>(&this->lock))};
const auto ti{reinterpret_cast<const std::intptr_t>(static_cast<const void*>(&to.lock))};
if(fi<ti){
flock.lock();
tlock.lock();
} else if (fi!=ti) {
tlock.lock();
flock.lock();
} else {
flock.lock();
}
//NON-PORTABLE:END
this->balance-=amount;
to.balance+=amount;
return true;
}
Money Account::get_balance() const{
const std::lock_guard<decltype(this->lock)> guard{this->lock};
return this->balance;
}
void hammer_transfer(Account& from,Account& to,const Money amount, const int tries){
for(int i{1};i<=tries;++i){
from.transfer(to,amount);
}
}
int main() {
constexpr Money open_a{ 200000L};
constexpr Money open_b{ 100000L};
constexpr Money tran_ab{10};
constexpr Money tran_ba{3};
constexpr Money tran_aa{7};
Account A{open_a};
Account B{open_b};
std::cout << "A Open:" << A.get_balance() << '\n';
std::cout << "B Open:" << B.get_balance() << '\n';
constexpr long tries{20000};
std::thread TAB{hammer_transfer,std::ref(A),std::ref(B),tran_ab,tries};
std::thread TBA{hammer_transfer,std::ref(B),std::ref(A),tran_ba,tries};
std::thread TAA{hammer_transfer,std::ref(A),std::ref(A),tran_aa,tries};
TAB.join();
TBA.join();
TAA.join();
const auto close_a{A.get_balance()};
const auto close_b{B.get_balance()};
std::cout << "A Close:" << close_a<< '\n';
std::cout << "B Close:" << close_b<< '\n';
int errors{0};
if((close_a+close_b)!=(open_a+open_b)){
std::cout << "ERROR: Money Leaked!\n";
++errors;
}
if(close_a!=(open_a+tries*(tran_ba-tran_ab)) ||
close_b!=(open_b+tries*(tran_ab-tran_ba))
){
std::cout << "ERROR: 'Lost' Transaction(s)\n";
++errors;
}
if(errors==0){
std::cout << "* SUCCESS *\n";
}else{
std::cout << "** FAILED **\n";
}
std::cout << std::endl;
return 0;
}
可在此处运行:https://ideone.com/hAUfhM
假设是(而且我相信足够 - 有人吗?)intptr_t
存在并且 intptr_t
上的关系运算符暗示对它们表示的指针值的完全严格排序。
假定的排序无法保证,并且可能比指针排序的不可移植性更不便携(例如,如果 intptr_t
比指针宽并且并非所有位都被写入)。
我知道这个设计和其他设计有一些不同的即兴演奏。 我会赞成所有好的答案,即使不是可移植的,这些答案确定了他们对实现的假设,理想情况下是一个他们应用的平台,最好是一个他们不应用的平台!
std::lock() 具有内置的死锁避免算法。
一旦开始出现锁争用,您就无法使用此方法,需要重新考虑整个解决方案。几乎所有的锁都会导致上下文切换,每次都会花费大约 20000 个周期。
通常大多数账户有很多收入(商店、安排)或支出(养老金、救济金等)
一旦你确定了竞争账户,你可以排队大量交易,然后锁定满足账户,运行 交易由 try_lock 另一个账户,如果锁定成功交易已经完成了。尝试 try_lock 几次,然后使用两个锁执行 scope_lock 以获取这两个锁的所有事务。
第 2 部分。 我如何确保我的锁的安全排序,因为比较不在同一区域的指针是 UB。
您为帐户添加一个唯一 ID 并以此进行比较!
tl;dr - 您可以在 C++20 中进行可移植的原始指针比较。我可能会将该代码包装成 scoped_ordered_lock
之类的东西,因为代码仍然有点毛茸茸。
The assumptions are (and I believe sufficient – anyone?) that intptr_t exists and that the relational operators on intptr_t imply a Total Strict Ordering on values when holding values cast from valid non-null pointers to std::mutex.
不准确。你做总是对整数值有一个完全严格的顺序。当从 intptr_t
到指针的映射是 many-to-one 时会出现问题(对于分段地址示例 intptr_t
上的 TSO 是不够的)。
指向intptr_t
映射的指针也必须是单射的(不一定是双射,因为我们不关心某些intptr_t
值是否是unused/don' t 表示有效指针)。
无论如何,很明显指针的完全严格排序 可以 存在:它只是 implementation-specific。分段地址可以归一化或扁平化等
幸运的是,提供了一个合适的 implementation-defined 完全严格排序:通过 C++20 中的 3-way 函子 std::compare_three_way
和 2-way 函子 less
, greater
等在 C++20 之前(并且 可能 也在 C++20 中)。
没有关于 implementation-defined strict total order over pointers in the text about the spaceship operator 的等效语言 - 即使 compare_three_way
被描述为调用它 - 或关于其他关系运算符。
这似乎是故意的,因此内置运算符 <
、>
、<=
、>=
和 <=>
不会获得在某些平台上可能代价高昂的新约束。事实上,双向关系运算符被明确描述为指针上的 partial order。
因此,这应该与您的原始代码相同,除了可移植的:
const auto order = std::compare_three_way{}(&this->lock, &to.lock);
if(order == std::strong_ordering::less){
flock.lock();
tlock.lock();
} else if (order == std::strong_ordering::greater) {
tlock.lock();
flock.lock();
} else {
flock.lock();
}
备注
C++20(特别是 PDF:P1961R0), [comparisons.general] 说
For templates
less
,greater
,less_equal
, andgreater_equal
, the specializations for any pointer type yield a result consistent with the implementation-defined strict total order over pointers这是一个较弱的要求,允许他们提供部分订单,只要它永远不会与总订单不一致。这是否是故意削弱并不明显,或者只是想说他们必须执行其他地方定义的 相同 总订单。
在 C++20 之前
less
等 did 需要对这些进行总排序仿函数。
无论如何,如果您无法访问 C++20 和 compare_three_way
,您的 less
等 保证提供您需要的总订购量。只是不要依赖原始关系运算符。
这是一个展示修改后代码的自我回答。功劳归功于上面接受的答案。
对我来说,学习是因为 C++14 std::less
、std::greater
等在指针上定义了一个 Strict Total,它与已经由 <
和 [=14 定义的偏序一致=]等
通过使用这些模板,此代码现在可以保证没有死锁。
在 C++20 中,使用 std::compare_three_way<>
.
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
typedef long Money; //In minor unit.
class Account {
public:
bool transfer(Account& to,const Money amount);
Money get_balance() const;
Account(const Money deposit=0) : balance{deposit} {}
private:
mutable std::mutex lock;
Money balance;
};
namespace{
std::less<void*> less{};
std::equal_to<void*> equal_to{};
}
bool Account::transfer(Account& to,const Money amount){
std::unique_lock<decltype(this->lock)> flock{this->lock,std::defer_lock};
std::unique_lock<decltype(to.lock)> tlock{to.lock,std::defer_lock};
if(less(&this->lock,&to.lock)){
flock.lock();
tlock.lock();
} else if(equal_to(&this->lock,&to.lock)) {
flock.lock();
} else {
tlock.lock();
flock.lock();
}
this->balance-=amount;
to.balance+=amount;
return true;
}
Money Account::get_balance() const{
const std::lock_guard<decltype(this->lock)> guard{this->lock};
return this->balance;
}
void hammer_transfer(Account& from,Account& to,const Money amount, const int tries){
for(int i{1};i<=tries;++i){
from.transfer(to,amount);
}
}
int main() {
constexpr Money open_a{ 200000L};
constexpr Money open_b{ 100000L};
constexpr Money tran_ab{10};
constexpr Money tran_ba{3};
constexpr Money tran_aa{7};
Account A{open_a};
Account B{open_b};
std::cout << "A Open:" << A.get_balance() << '\n';
std::cout << "B Open:" << B.get_balance() << '\n';
constexpr long tries{20000};
std::thread TAB{hammer_transfer,std::ref(A),std::ref(B),tran_ab,tries};
std::thread TBA{hammer_transfer,std::ref(B),std::ref(A),tran_ba,tries};
std::thread TAA{hammer_transfer,std::ref(A),std::ref(A),tran_aa,tries};
TAB.join();
TBA.join();
TAA.join();
const auto close_a{A.get_balance()};
const auto close_b{B.get_balance()};
std::cout << "A Close:" << close_a<< '\n';
std::cout << "B Close:" << close_b<< '\n';
int errors{0};
if((close_a+close_b)!=(open_a+open_b)){
std::cout << "ERROR: Money Leaked!\n";
++errors;
}
if(close_a!=(open_a+tries*(tran_ba-tran_ab)) ||
close_b!=(open_b+tries*(tran_ab-tran_ba))
){
std::cout << "ERROR: 'Lost' Transaction(s)\n";
++errors;
}
if(errors==0){
std::cout << "* SUCCESS *\n";
}else{
std::cout << "** FAILED **\n";
}
std::cout << std::endl;
return 0;
}