标准 C++ 中的共享递归互斥锁

A shared recursive mutex in standard C++

从 C++11 开始就有了 shared_mutex class planned for C++17. And shared_timed_mutex already in C++14. (Who knows why they came in that order, but whatever.) Then there is a recursive_mutex and a recursive_timed_mutex。我需要的是一个shared_recursive_mutex。我是否遗漏了标准中的某些内容,或者我是否必须再等三年才能获得该标准的标准化版本?

如果目前没有这样的工具,那么只使用标准 C++ 来实现这样一个功能的简单(第一优先级)和高效(第二优先级)是什么?

如果您使用的是 Linux / POSIX 平台,那么您很幸运,因为 C++ 互斥量是根据 POSIX 互斥量建模的。 POSIX 提供了更多的特性,包括递归、进程共享等等。将 POSIX 基元包装到 C++ 类 中是直截了当的。

Good entry point into POSIX threads documentation.

可以使用现有原语构建共享递归互斥体。不过我不建议这样做。

这并不简单,包装现有的 POSIX 实现(或您平台的任何本地实现)很可能会更有效率。

如果您 决定编写自己的实现,使其高效仍然取决于特定于平台的细节,因此您要么为每个平台编写一个具有不同实现的接口平台,或者您正在选择一个平台,并且可以轻松地使用本机(POSIX 或其他)设施。

我当然不会提供示例递归 read/write 锁实现,因为对于 Stack Overflow 答案来说,这是一个完全不合理的工作量。

Recursive 属性 的互斥量使用术语 owner,在 [= 的情况下55=] 定义不明确:多个线程可能同时调用了 .lock_shared()

假设 owner 作为调用 .lock()(不是 .lock_shared()!)的线程,递归共享互斥锁的实现可以简单地从 shared_mutex:

class shared_recursive_mutex: public shared_mutex
{
public:
    void lock(void) {
        std::thread::id this_id = std::this_thread::get_id();
        if(owner == this_id) {
            // recursive locking
            count++;
        }
        else {
            // normal locking
            shared_mutex::lock();
            owner = this_id;
            count = 1;
        }
    }
    void unlock(void) {
        if(count > 1) {
            // recursive unlocking
            count--;
        }
        else {
            // normal unlocking
            owner = std::thread::id();
            count = 0;
            shared_mutex::unlock();
        }
    }

private:
    std::atomic<std::thread::id> owner;
    int count;
};

字段 .owner 需要声明为原子的,因为在 .lock() 方法中它在没有保护并发访问的情况下被检查。

如果您想递归调用 .lock_shared() 方法,您需要维护 所有者映射 ,并且应该使用一些额外的互斥锁来保护对该映射的访问。

允许具有活动 .lock() 的线程调用 .lock_shared() 使实现更加复杂。

最后,允许线程提前锁定从.lock_shared().lock()no-no,因为当两个线程尝试执行该推进时,可能会导致死锁。


同样,recursive shared mutex 的语义非常脆弱,所以最好不要使用它。

这是一个围绕类型 T 的快速线程安全包装器:

template<class T, class Lock>
struct lock_guarded {
  Lock l;
  T* t;
  T* operator->()&&{ return t; }
  template<class Arg>
  auto operator[](Arg&&arg)&&
  -> decltype(std::declval<T&>()[std::declval<Arg>()])
  {
    return (*t)[std::forward<Arg>(arg)];
  }
  T& operator*()&&{ return *t; }
};
constexpr struct emplace_t {} emplace {};
template<class T>
struct mutex_guarded {
  lock_guarded<T, std::unique_lock<std::mutex>>
  get_locked() {
    return {{m},&t};
  }
  lock_guarded<T const, std::unique_lock<std::mutex>>
  get_locked() const {
    return {{m},&t};
  }
  lock_guarded<T, std::unique_lock<std::mutex>>
  operator->() {
    return get_locked();
  }
  lock_guarded<T const, std::unique_lock<std::mutex>>
  operator->() const {
    return get_locked();
  }
  template<class F>
  std::result_of_t<F(T&)>
  operator->*(F&& f) {
    return std::forward<F>(f)(*get_locked());
  }
  template<class F>
  std::result_of_t<F(T const&)>
  operator->*(F&& f) const {
    return std::forward<F>(f)(*get_locked());
  }
  template<class...Args>
  mutex_guarded(emplace_t, Args&&...args):
    t(std::forward<Args>(args)...)
  {}
  mutex_guarded(mutex_guarded&& o):
    t( std::move(*o.get_locked()) )
  {}
  mutex_guarded(mutex_guarded const& o):
    t( *o.get_locked() )
  {}
  mutex_guarded() = default;
  ~mutex_guarded() = default;
  mutex_guarded& operator=(mutex_guarded&& o)
  {
    T tmp = std::move(o.get_locked());
    *get_locked() = std::move(tmp);
    return *this;
  }
  mutex_guarded& operator=(mutex_guarded const& o):
  {
    T tmp = o.get_locked();
    *get_locked() = std::move(tmp);
    return *this;
  }

private:
  std::mutex m;
  T t;
};

您可以使用:

mutex_guarded<std::vector<int>> guarded;
auto s0 = guarded->size();
auto s1 = guarded->*[](auto&&e){return e.size();};

两者做的事情大致相同,只有在互斥量被锁定时才能访问被保护的对象。

从@tsyvarev 的答案中窃取(稍作改动)我们得到:

class shared_recursive_mutex
{
  std::shared_mutex m
public:
  void lock(void) {
    std::thread::id this_id = std::this_thread::get_id();
    if(owner == this_id) {
      // recursive locking
      ++count;
    } else {
      // normal locking
      m.lock();
      owner = this_id;
      count = 1;
    }
  }
  void unlock(void) {
    if(count > 1) {
      // recursive unlocking
      count--;
    } else {
      // normal unlocking
      owner = std::thread::id();
      count = 0;
      m.unlock();
    }
  }
  void lock_shared() {
    std::thread::id this_id = std::this_thread::get_id();
    if (shared_counts->count(this_id)) {
      ++(shared_count.get_locked()[this_id]);
    } else {
      m.lock_shared();
      shared_count.get_locked()[this_id] = 1;
    }
  }
  void unlock_shared() {
    std::thread::id this_id = std::this_thread::get_id();
    auto it = shared_count->find(this_id);
    if (it->second > 1) {
      --(it->second);
    } else {
      shared_count->erase(it);
      m.unlock_shared();
    }
  }
private:
  std::atomic<std::thread::id> owner;
  std::atomic<std::size_t> count;
  mutex_guarded<std::map<std::thread::id, std::size_t>> shared_counts;
};

try_locktry_lock_shared 留作练习。

lock 和 unlock 共享锁定互斥量两次(这是安全的,因为分支实际上是关于 "is this thread in control of the mutex",并且另一个线程无法将答案从 "no" 更改为 "yes"或相反亦然)。你可以用 ->* 而不是 -> 的一个锁来完成它,这会使它更快(以逻辑上的一些复杂性为代价)。


以上不支持先有独占锁,再有共享锁。这很棘手。它不支持拥有一个共享锁,然后升级到一个唯一锁,因为当 2 个线程尝试这样做时,基本上不可能阻止它死锁。

最后一个问题可能是递归共享互斥锁不是一个好主意的原因。

分享我的实现,没有承诺

recursive_shared_mutex.h

#ifndef _RECURSIVE_SHARED_MUTEX_H
#define _RECURSIVE_SHARED_MUTEX_H

#include <thread>
#include <mutex>
#include <map>

struct recursive_shared_mutex
{
public:

    recursive_shared_mutex() :
        m_mtx{}, m_exclusive_thread_id{}, m_exclusive_count{ 0 }, m_shared_locks{}
    {}

    void lock();
    bool try_lock();
    void unlock();

    void lock_shared();
    bool try_lock_shared();
    void unlock_shared();

    recursive_shared_mutex(const recursive_shared_mutex&) = delete;
    recursive_shared_mutex& operator=(const recursive_shared_mutex&) = delete;

private:

    inline bool is_exclusive_locked()
    {
        return m_exclusive_count > 0;
    }

    inline bool is_shared_locked()
    {
        return m_shared_locks.size() > 0;
    }

    inline bool can_exclusively_lock()
    {
        return can_start_exclusive_lock() || can_increment_exclusive_lock();
    }

    inline bool can_start_exclusive_lock()
    {
        return !is_exclusive_locked() && (!is_shared_locked() || is_shared_locked_only_on_this_thread());
    }

    inline bool can_increment_exclusive_lock()
    {
        return is_exclusive_locked_on_this_thread();
    }

    inline bool can_lock_shared()
    {
        return !is_exclusive_locked() || is_exclusive_locked_on_this_thread();
    }

    inline bool is_shared_locked_only_on_this_thread()
    {
        return is_shared_locked_only_on_thread(std::this_thread::get_id());
    }

    inline bool is_shared_locked_only_on_thread(std::thread::id id)
    {
        return m_shared_locks.size() == 1 && m_shared_locks.find(id) != m_shared_locks.end();
    }

    inline bool is_exclusive_locked_on_this_thread()
    {
        return is_exclusive_locked_on_thread(std::this_thread::get_id());
    }

    inline bool is_exclusive_locked_on_thread(std::thread::id id)
    {
        return m_exclusive_count > 0 && m_exclusive_thread_id == id;
    }

    inline void start_exclusive_lock()
    {
        m_exclusive_thread_id = std::this_thread::get_id();
        m_exclusive_count++;
    }

    inline void increment_exclusive_lock()
    {
        m_exclusive_count++;
    }

    inline void decrement_exclusive_lock()
    {
        if (m_exclusive_count == 0)
        {
            throw std::logic_error("Not exclusively locked, cannot exclusively unlock");
        }
        if (m_exclusive_thread_id == std::this_thread::get_id())
        {
            m_exclusive_count--;
        }
        else
        {
            throw std::logic_error("Calling exclusively unlock from the wrong thread");
        }
    }

    inline void increment_shared_lock()
    {
        increment_shared_lock(std::this_thread::get_id());
    }

    inline void increment_shared_lock(std::thread::id id)
    {
        if (m_shared_locks.find(id) == m_shared_locks.end())
        {
            m_shared_locks[id] = 1;
        }
        else
        {
            m_shared_locks[id] += 1;
        }
    }

    inline void decrement_shared_lock()
    {
        decrement_shared_lock(std::this_thread::get_id());
    }

    inline void decrement_shared_lock(std::thread::id id)
    {
        if (m_shared_locks.size() == 0)
        {
            throw std::logic_error("Not shared locked, cannot shared unlock");
        }
        if (m_shared_locks.find(id) == m_shared_locks.end())
        {
            throw std::logic_error("Calling shared unlock from the wrong thread");
        }
        else
        {
            if (m_shared_locks[id] == 1)
            {
                m_shared_locks.erase(id);
            }
            else
            {
                m_shared_locks[id] -= 1;
            }
        }
    }

    std::mutex m_mtx;
    std::thread::id m_exclusive_thread_id;
    size_t m_exclusive_count;
    std::map<std::thread::id, size_t> m_shared_locks;
    std::condition_variable m_cond_var;
};

#endif

recursive_shared_mutex.cpp

#include "recursive_shared_mutex.h"
#include <condition_variable>

void recursive_shared_mutex::lock()
{
    std::unique_lock sync_lock(m_mtx);
    m_cond_var.wait(sync_lock, [this] { return can_exclusively_lock(); });
    if (is_exclusive_locked_on_this_thread())
    {
        increment_exclusive_lock();
    }
    else
    {
        start_exclusive_lock();
    }
}

bool recursive_shared_mutex::try_lock()
{
    std::unique_lock sync_lock(m_mtx);
    if (can_increment_exclusive_lock())
    {
        increment_exclusive_lock();
        return true;
    }
    if (can_start_exclusive_lock())
    {
        start_exclusive_lock();
        return true;
    }
    return false;
}

void recursive_shared_mutex::unlock()
{
    {
        std::unique_lock sync_lock(m_mtx);
        decrement_exclusive_lock();
    }
    m_cond_var.notify_all();
}

void recursive_shared_mutex::lock_shared()
{
    std::unique_lock sync_lock(m_mtx);
    m_cond_var.wait(sync_lock, [this] { return can_lock_shared(); });
    increment_shared_lock();
}

bool recursive_shared_mutex::try_lock_shared()
{
    std::unique_lock sync_lock(m_mtx);
    if (can_lock_shared())
    {
        increment_shared_lock();
        return true;
    }
    return false;
}

void recursive_shared_mutex::unlock_shared()
{
    {
        std::unique_lock sync_lock(m_mtx);
        decrement_shared_lock();
    }
    m_cond_var.notify_all();
}

如果一个线程拥有一个共享锁,它也可以在不放弃它的共享锁的情况下获得一个独占锁。 (这当然需要其他线程当前没有共享锁或独占锁)

反之亦然,拥有独占锁的线程可能获得共享锁。

有趣的是,这些属性还允许锁定 upgradable/downgradable。

暂时升级锁:

recusrive_shared_mutex mtx;
foo bar;

mtx.lock_shared();
if (bar.read() == x)
{
    mtx.lock();
    bar.write(y);
    mtx.unlock();
}
mtx.unlock_shared();

从独占锁降级为共享锁

recusrive_shared_mutex mtx;
foo bar;

mtx.lock();
bar.write(x);
mtx.lock_shared();
mtx.unlock();
while (bar.read() != y)
{
     // Something
}
mtx.unlock_shared();

我搜索了一个 C++ read-write-lock 并遇到了这个相关问题。我们确实需要这样一个 shared_recursive_mutex 来控制从多个线程访问我们的“数据库”class。因此,为了完整起见:如果您正在寻找另一个实施示例(就像我一样),您可能也想考虑这个 link:shared_recursive_mutex implementation using C++17 (on github).

Features

  • C++17
  • Single Header
  • Dependency-free

它有一个缺点:static thread_local 成员通过模板专用于 PhantomType class。因此,您不能真正在同一 (PhantomType) class 的多个单独实例中使用此 shared_recursive_mutex。如果这对您没有限制,请尝试一下。