通过排序 std::mutex 避免死锁

Deadlock avoidance by ordering std::mutex

这里是否有死锁避免逻辑的可移植实现(请参阅标记为“不可移植”的部分):

#include <cstdint>
#include <iostream>
#include <mutex>
#include <thread>

typedef long Money; //In minor unit.

class Account {
public:
    bool transfer(Account& to,const Money amount);
    Money get_balance() const;
    Account(const Money deposit=0) : balance{deposit} {}
private:
    mutable std::mutex lock;
    Money balance;
};

bool Account::transfer(Account& to,const Money amount){
    std::unique_lock<decltype(this->lock)> flock{this->lock,std::defer_lock};
    std::unique_lock<decltype(to.lock)> tlock{to.lock,std::defer_lock};
//NON-PORTABLE:BEGIN: using intptr_t AND assuming Total Strict Order.
    const auto fi{reinterpret_cast<const std::intptr_t>(static_cast<const void*>(&this->lock))};
    const auto ti{reinterpret_cast<const std::intptr_t>(static_cast<const void*>(&to.lock))};
    if(fi<ti){
        flock.lock();
        tlock.lock();
    } else if (fi!=ti) {
        tlock.lock();
        flock.lock();
    } else {
        flock.lock();
    }
//NON-PORTABLE:END  
    this->balance-=amount;
    to.balance+=amount;
    return true;
}

Money Account::get_balance() const{
    const std::lock_guard<decltype(this->lock)> guard{this->lock};
    return this->balance;
}

void hammer_transfer(Account& from,Account& to,const Money amount, const int tries){
    for(int i{1};i<=tries;++i){
        from.transfer(to,amount);
    }
}

int main() {
    constexpr Money open_a{ 200000L};
    constexpr Money open_b{ 100000L};
    constexpr Money tran_ab{10};
    constexpr Money tran_ba{3};
    constexpr Money tran_aa{7};

    Account A{open_a};
    Account B{open_b};
    
    std::cout << "A Open:" << A.get_balance() << '\n';
    std::cout << "B Open:" << B.get_balance() << '\n';
    
    constexpr long tries{20000}; 
    std::thread TAB{hammer_transfer,std::ref(A),std::ref(B),tran_ab,tries};
    std::thread TBA{hammer_transfer,std::ref(B),std::ref(A),tran_ba,tries};
    std::thread TAA{hammer_transfer,std::ref(A),std::ref(A),tran_aa,tries};

    TAB.join();
    TBA.join();
    TAA.join();

    const auto close_a{A.get_balance()};
    const auto close_b{B.get_balance()};   
    
    std::cout << "A Close:" << close_a<< '\n';
    std::cout << "B Close:" << close_b<< '\n';
    
    int errors{0};
    if((close_a+close_b)!=(open_a+open_b)){
        std::cout << "ERROR: Money Leaked!\n";
        ++errors;
    }
    if(close_a!=(open_a+tries*(tran_ba-tran_ab)) ||
          close_b!=(open_b+tries*(tran_ab-tran_ba))
    ){
        std::cout << "ERROR: 'Lost' Transaction(s)\n";
        ++errors;
    }
    if(errors==0){
        std::cout << "* SUCCESS *\n";
    }else{
        std::cout << "** FAILED **\n";
    }
    std::cout << std::endl;
    return 0;
}

可在此处运行:https://ideone.com/hAUfhM

假设是(而且我相信足够 - 有人吗?)intptr_t 存在并且 intptr_t 上的关系运算符暗示对它们表示的指针值的完全严格排序。

假定的排序无法保证,并且可能比指针排序的不可移植性更不便携(例如,如果 intptr_t 比指针宽并且并非所有位都被写入)。

我知道这个设计和其他设计有一些不同的即兴演奏。 我会赞成所有好的答案,即使不是可移植的,这些答案确定了他们对实现的假设,理想情况下是一个他们应用的平台,最好是一个他们不应用的平台!

std::lock() 具有内置的死锁避免算法。

https://en.cppreference.com/w/cpp/thread/lock

一旦开始出现锁争用,您就无法使用此方法,需要重新考虑整个解决方案。几乎所有的锁都会导致上下文切换,每次都会花费大约 20000 个周期。

通常大多数账户有很多收入(商店、安排)或支出(养老金、救济金等)

一旦你确定了竞争账户,你可以排队大量交易,然后锁定满足账户,运行 交易由 try_lock 另一个账户,如果锁定成功交易已经完成了。尝试 try_lock 几次,然后使用两个锁执行 scope_lock 以获取这两个锁的所有事务。

第 2 部分。 我如何确保我的锁的安全排序,因为比较不在同一区域的指针是 UB。

您为帐户添加一个唯一 ID 并以此进行比较!

tl;dr - 您可以在 C++20 中进行可移植的原始指针比较。我可能会将该代码包装成 scoped_ordered_lock 之类的东西,因为代码仍然有点毛茸茸。


The assumptions are (and I believe sufficient – anyone?) that intptr_t exists and that the relational operators on intptr_t imply a Total Strict Ordering on values when holding values cast from valid non-null pointers to std::mutex.

不准确。你总是对整数值有一个完全严格的顺序。当从 intptr_t 到指针的映射是 many-to-one 时会出现问题(对于分段地址示例 就是这种情况 - 即 intptr_t 上的 TSO 是不够的)。

指向intptr_t映射的指针也必须是单射的(不一定是双射,因为我们不关心某些intptr_t值是否是unused/don' t 表示有效指针)。

无论如何,很明显指针的完全严格排序 可以 存在:它只是 implementation-specific。分段地址可以归一化或扁平化等

幸运的是,提供了一个合适的 implementation-defined 完全严格排序:通过 C++20 中的 3-way 函子 std::compare_three_way 和 2-way 函子 lessgreater 等在 C++20 之前(并且 可能 也在 C++20 中)。

没有关于 implementation-defined strict total order over pointers in the text about the spaceship operator 的等效语言 - 即使 compare_three_way 被描述为调用它 - 或关于其他关系运算符。

这似乎是故意的,因此内置运算符 <><=>=<=> 不会获得在某些平台上可能代价高昂的新约束。事实上,双向关系运算符被明确描述为指针上的 partial order

因此,这应该与您的原始代码相同,除了可移植的:

const auto order = std::compare_three_way{}(&this->lock, &to.lock);
if(order == std::strong_ordering::less){
    flock.lock();
    tlock.lock();
} else if (order == std::strong_ordering::greater) {
    tlock.lock();
    flock.lock();
} else {
    flock.lock();
}

备注

  • C++20(特别是 PDF:P1961R0), [comparisons.general] 说

    For templates less, greater, less_­equal, and greater_­equal, the specializations for any pointer type yield a result consistent with the implementation-defined strict total order over pointers

    这是一个较弱的要求,允许他们提供部分订单,只要它永远不会与总订单不一致。这是否是故意削弱并不明显,或者只是想说他们必须执行其他地方定义的 相同 总订单。

  • 在 C++20 之前 lessdid 需要对这些进行总排序仿函数。

无论如何,如果您无法访问 C++20 和 compare_three_way,您的 less 保证提供您需要的总订购量。只是不要依赖原始关系运算符。

这是一个展示修改后代码的自我回答。功劳归功于上面接受的答案。 对我来说,学习是因为 C++14 std::lessstd::greater 等在指针上定义了一个 Strict Total,它与已经由 < 和 [=14 定义的偏序一致=]等

通过使用这些模板,此代码现在可以保证没有死锁。 在 C++20 中,使用 std::compare_three_way<>.

可以使它更整洁并且可能更快

https://ideone.com/ekuf2f

#include <functional>    
#include <iostream>
#include <mutex>
#include <thread>

typedef long Money; //In minor unit.

class Account {
public:
    bool transfer(Account& to,const Money amount);
    Money get_balance() const;
    Account(const Money deposit=0) : balance{deposit} {}
private:
    mutable std::mutex lock;
    Money balance;
};

namespace{
    std::less<void*> less{};
    std::equal_to<void*> equal_to{};
}

bool Account::transfer(Account& to,const Money amount){
    std::unique_lock<decltype(this->lock)> flock{this->lock,std::defer_lock};
    std::unique_lock<decltype(to.lock)> tlock{to.lock,std::defer_lock};
    if(less(&this->lock,&to.lock)){
        flock.lock();
        tlock.lock();
    } else if(equal_to(&this->lock,&to.lock)) {
        flock.lock();
    } else {
        tlock.lock();
        flock.lock();
    }
    this->balance-=amount;
    to.balance+=amount;
    return true;
}

Money Account::get_balance() const{
    const std::lock_guard<decltype(this->lock)> guard{this->lock};
    return this->balance;
}

void hammer_transfer(Account& from,Account& to,const Money amount, const int tries){
    for(int i{1};i<=tries;++i){
        from.transfer(to,amount);
    }
}

int main() {
    constexpr Money open_a{ 200000L};
     constexpr Money open_b{ 100000L};
    constexpr Money tran_ab{10};
    constexpr Money tran_ba{3};
    constexpr Money tran_aa{7};

    Account A{open_a};
    Account B{open_b};
    
    std::cout << "A Open:" << A.get_balance() << '\n';
    std::cout << "B Open:" << B.get_balance() << '\n';
    
    constexpr long tries{20000}; 
    std::thread TAB{hammer_transfer,std::ref(A),std::ref(B),tran_ab,tries};
    std::thread TBA{hammer_transfer,std::ref(B),std::ref(A),tran_ba,tries};
    std::thread TAA{hammer_transfer,std::ref(A),std::ref(A),tran_aa,tries};

    TAB.join();
    TBA.join();
    TAA.join();

    const auto close_a{A.get_balance()};
    const auto close_b{B.get_balance()};   
    
    std::cout << "A Close:" << close_a<< '\n';
    std::cout << "B Close:" << close_b<< '\n';
    
    int errors{0};
    if((close_a+close_b)!=(open_a+open_b)){
        std::cout << "ERROR: Money Leaked!\n";
        ++errors;
    }
    if(close_a!=(open_a+tries*(tran_ba-tran_ab)) ||
          close_b!=(open_b+tries*(tran_ab-tran_ba))
    ){
        std::cout << "ERROR: 'Lost' Transaction(s)\n";
        ++errors;
    }
    if(errors==0){
        std::cout << "* SUCCESS *\n";
    }else{
        std::cout << "** FAILED **\n";
    }
    std::cout << std::endl;
    return 0;
}