我应该如何处理 C++ 中可移动类型的互斥量?

How should I deal with mutexes in movable types in C++?

根据设计,std::mutex 不可移动也不可复制。这意味着持有互斥量的 class A 不会收到默认的移动构造函数。

如何使这种类型 A 以线程安全的方式移动?

首先,如果你想移动一个包含互斥量的对象,你的设计肯定有问题。

但是如果你决定这样做,你必须在移动构造函数中创建一个新的互斥锁,例如:

// movable
struct B{};

class A {
    B b;
    std::mutex m;
public:
    A(A&& a)
        : b(std::move(a.b))
        // m is default-initialized.
    {
    }
};

这是线程安全的,因为移动构造函数可以安全地假定其参数未在其他任何地方使用,因此不需要锁定参数。

鉴于似乎没有一个很好、干净、简单的方法来回答这个问题 - 我 认为 安东的解决方案是正确的,但它绝对值得商榷,除非有更好的答案up 我建议将这样的 class 放在堆上并通过 std::unique_ptr:

来处理它
auto a = std::make_unique<A>();

它现在是一个完全可移动的类型,任何在移动发生时锁定内部互斥锁的人仍然是安全的,即使这是否是一件好事值得商榷

如果您需要复制语义,只需使用

auto a2 = std::make_shared<A>();

使用互斥锁和 C++ 移动语义是在线程之间安全高效地传输数据的绝佳方式。

想象一个 'producer' 线程生成一批字符串并将它们提供给(一个或多个)消费者。这些批次可以由包含(可能很大)std::vector<std::string> 个对象的对象表示。 我们绝对希望 'move' 将这些向量的内部状态传递给它们的消费者,而不会出现不必要的重复。

您只需将互斥体识别为对象的一部分,而不是对象状态的一部分。也就是说,您不想移动互斥量。

您需要什么样的锁定取决于您的算法或您的对象的泛化程度以及您允许的使用范围。

如果您只从共享状态 'producer' 对象移动到线程本地 'consuming' 对象,您可能只锁定移动的 from 对象。

如果是更通用的设计,则需要同时锁定两者。在这种情况下,您需要考虑死锁。

如果这是一个潜在的问题,那么使用 std::lock() 以无死锁的方式获取两个互斥量的锁。

http://en.cppreference.com/w/cpp/thread/lock

最后一点,您需要确保您理解移动语义。 回想一下,被移出的对象处于有效但未知的状态。 不执行移动的线程完全有可能有正当理由尝试访问移动的对象,当它可能发现有效但未知的状态时。

同样,我的制作人只是在敲弦乐,而消费者正在带走全部负载。在那种情况下,每次生产者尝试添加到向量时,它可能会发现向量非空或为空。

简而言之,如果对移动对象的潜在并发访问相当于一次写入,则可能没问题。如果它等于读取,那么想想为什么读取任意状态都可以。

让我们从一些代码开始:

class A
{
    using MutexType = std::mutex;
    using ReadLock = std::unique_lock<MutexType>;
    using WriteLock = std::unique_lock<MutexType>;

    mutable MutexType mut_;

    std::string field1_;
    std::string field2_;

public:
    ...

我在那里放了一些相当有启发性的类型别名,我们在 C++11 中不会真正利用这些别名,但在 C++14 中会变得更有用。请耐心等待,我们会到达那里。

您的问题归结为:

How do I write the move constructor and move assignment operator for this class?

我们将从移动构造函数开始。

移动构造函数

请注意,成员 mutex 已创建 mutable。严格来说,这对于 move 成员来说不是必需的,但我假设您也想要 copy 成员。如果不是这种情况,则无需创建互斥锁 mutable.

构造A时,不需要加锁this->mut_。但是你确实需要锁定你正在构建的对象的 mut_ (移动或复制)。可以这样做:

    A(A&& a)
    {
        WriteLock rhs_lk(a.mut_);
        field1_ = std::move(a.field1_);
        field2_ = std::move(a.field2_);
    }

注意我们必须先默认构造this的成员,只有在a.mut_被锁定后才给它们赋值。

移动作业

移动赋值运算符要复杂得多,因为您不知道是否有其他线程正在访问赋值表达式的 lhs 或 rhs。而一般情况下,您需要防范以下情况:

// Thread 1
x = std::move(y);

// Thread 2
y = std::move(x);

下面是正确保护上述场景的移动赋值运算符:

    A& operator=(A&& a)
    {
        if (this != &a)
        {
            WriteLock lhs_lk(mut_, std::defer_lock);
            WriteLock rhs_lk(a.mut_, std::defer_lock);
            std::lock(lhs_lk, rhs_lk);
            field1_ = std::move(a.field1_);
            field2_ = std::move(a.field2_);
        }
        return *this;
    }

注意必须使用std::lock(m1, m2)来锁定两个互斥量,而不是一个接一个地锁定它们。如果你一个接一个地锁定它们,那么当两个线程如上所示以相反的顺序分配两个对象时,你就会陷入死锁。 std::lock 的目的是避免死锁。

复制构造函数

你没有问副本成员,但我们现在不妨谈谈他们(如果不是你,有人会需要他们)。

    A(const A& a)
    {
        ReadLock  rhs_lk(a.mut_);
        field1_ = a.field1_;
        field2_ = a.field2_;
    }

除了使用 ReadLock 别名而不是 WriteLock 之外,复制构造函数看起来很像移动构造函数。目前这两个别名 std::unique_lock<std::mutex> 所以它并没有什么区别。

但在 C++14 中,您可以选择这样说:

    using MutexType = std::shared_timed_mutex;
    using ReadLock  = std::shared_lock<MutexType>;
    using WriteLock = std::unique_lock<MutexType>;

可能是一种优化,但不一定。您将必须测量以确定它是否是。但是随着这一变化,人们可以同时在多个线程中从复制构造相同的rhs。 C++11 解决方案强制您使此类线程成为顺序线程,即使 rhs 没有被修改。

复制作业

为了完整起见,这里是复制赋值运算符,在阅读其他所有内容后应该是相当自我解释的:

    A& operator=(const A& a)
    {
        if (this != &a)
        {
            WriteLock lhs_lk(mut_, std::defer_lock);
            ReadLock  rhs_lk(a.mut_, std::defer_lock);
            std::lock(lhs_lk, rhs_lk);
            field1_ = a.field1_;
            field2_ = a.field2_;
        }
        return *this;
    }

等等

如果您希望多个线程能够同时调用它们,那么访问 A 状态的任何其他成员或自由函数也需要受到保护。例如,这里是 swap:

    friend void swap(A& x, A& y)
    {
        if (&x != &y)
        {
            WriteLock lhs_lk(x.mut_, std::defer_lock);
            WriteLock rhs_lk(y.mut_, std::defer_lock);
            std::lock(lhs_lk, rhs_lk);
            using std::swap;
            swap(x.field1_, y.field1_);
            swap(x.field2_, y.field2_);
        }
    }

请注意,如果您只依赖 std::swap 完成这项工作,锁定的粒度将是错误的,在 std::swap 将在内部执行的三个动作之间锁定和解锁。

确实,考虑 swap 可以让您深入了解 API 您可能需要提供 "thread-safe" A,这通常不同于"non-thread-safe" API,因为 "locking granularity" 问题。

还要注意防范"self-swap"的需要。 "self-swap" 应该是空操作。如果没有自检,就会递归地锁定同一个互斥体。这也可以在没有自检的情况下通过使用 std::recursive_mutex for MutexType.

来解决

更新

在下面的评论中,Yakk 对必须在复制和移动构造函数中默认构造东西感到非常不高兴(他有一个观点)。如果你对这个问题的感受足够强烈,以至于你愿意为它花费记忆,你可以这样避免它:

  • 添加您需要的任何锁类型作为数据成员。这些成员必须位于受保护的数据之前:

    mutable MutexType mut_;
    ReadLock  read_lock_;
    WriteLock write_lock_;
    // ... other data members ...
    
  • 然后在构造函数(例如复制构造函数)中这样做:

    A(const A& a)
        : read_lock_(a.mut_)
        , field1_(a.field1_)
        , field2_(a.field2_)
    {
        read_lock_.unlock();
    }
    

糟糕,Yakk 在我有机会完成此更新之前删除了他的评论。但他推动这个问题并为这个答案找到解决方案值得称赞。

更新 2

dyp 提出了这个好建议:

    A(const A& a)
        : A(a, ReadLock(a.mut_))
    {}
private:
    A(const A& a, ReadLock rhs_lk)
        : field1_(a.field1_)
        , field2_(a.field2_)
    {}

这是一个颠倒的答案。不要将 "this objects needs to be synchronized" 作为类型的基础嵌入,而是将其注入 任何类型。

您处理同步对象的方式非常不同。一个大问题是您必须担心死锁(锁定多个对象)。它也不应该是您的 "default version of an object":同步对象用于将处于争用状态的对象,您的目标应该是最大程度地减少线程之间的争用,而不是将其扫地出门。

但是同步对象还是有用的。我们可以编写一个 class 在同步中包装任意类型,而不是从同步器继承。既然对象是同步的,用户必须跳过几个环节才能对对象进行操作,但他们不限于对对象进行一些手动编码的有限操作集。他们可以将对对象的多个操作组合成一个,或者对多个对象进行操作。

这是一个围绕任意类型的同步包装器 T:

template<class T>
struct synchronized {
  template<class F>
  auto read(F&& f) const&->std::result_of_t<F(T const&)> {
    return access(std::forward<F>(f), *this);
  }
  template<class F>
  auto read(F&& f) &&->std::result_of_t<F(T&&)> {
    return access(std::forward<F>(f), std::move(*this));
  }
  template<class F>
  auto write(F&& f)->std::result_of_t<F(T&)> {
    return access(std::forward<F>(f), *this);
  }
  // uses `const` ness of Syncs to determine access:
  template<class F, class... Syncs>
  friend auto access( F&& f, Syncs&&... syncs )->
  std::result_of_t< F(decltype(std::forward<Syncs>(syncs).t)...) >
  {
    return access2( std::index_sequence_for<Syncs...>{}, std::forward<F>(f), std::forward<Syncs>(syncs)... );
  };
  synchronized(synchronized const& o):t(o.read([](T const&o){return o;})){}
  synchronized(synchronized && o):t(std::move(o).read([](T&&o){return std::move(o);})){}  
  // special member functions:
  synchronized( T & o ):t(o) {}
  synchronized( T const& o ):t(o) {}
  synchronized( T && o ):t(std::move(o)) {}
  synchronized( T const&& o ):t(std::move(o)) {}
  synchronized& operator=(T const& o) {
    write([&](T& t){
      t=o;
    });
    return *this;
  }
  synchronized& operator=(T && o) {
    write([&](T& t){
      t=std::move(o);
    });
    return *this;
  }
private:
  template<class X, class S>
  static auto smart_lock(S const& s) {
    return std::shared_lock< std::shared_timed_mutex >(s.m, X{});
  }
  template<class X, class S>
  static auto smart_lock(S& s) {
    return std::unique_lock< std::shared_timed_mutex >(s.m, X{});
  }
  template<class L>
  static void lock(L& lockable) {
      lockable.lock();
  }
  template<class...Ls>
  static void lock(Ls&... lockable) {
      std::lock( lockable... );
  }
  template<size_t...Is, class F, class...Syncs>
  friend auto access2( std::index_sequence<Is...>, F&&f, Syncs&&...syncs)->
  std::result_of_t< F(decltype(std::forward<Syncs>(syncs).t)...) >
  {
    auto locks = std::make_tuple( smart_lock<std::defer_lock_t>(syncs)... );
    lock( std::get<Is>(locks)... );
    return std::forward<F>(f)(std::forward<Syncs>(syncs).t ...);
  }

  mutable std::shared_timed_mutex m;
  T t;
};
template<class T>
synchronized< T > sync( T&& t ) {
  return {std::forward<T>(t)};
}

包括 C++14 和 C++1z 功能。

这假定 const 操作是多重的-reader 安全(这是 std 容器所假定的)。

使用看起来像:

synchronized<int> x = 7;
x.read([&](auto&& v){
  std::cout << v << '\n';
});

对于具有同步访问的 int

我建议不要 synchronized(synchronized const&)。很少需要它。

如果你需要synchronized(synchronized const&),我很想用std::aligned_storage代替T t;,允许手动放置构建,并进行手动销毁。这样可以进行适当的生命周期管理。

除此之外,我们可以复制源代码 T,然后从中读取:

synchronized(synchronized const& o):
  t(o.read(
    [](T const&o){return o;})
  )
{}
synchronized(synchronized && o):
  t(std::move(o).read(
    [](T&&o){return std::move(o);})
  )
{}

作业:

synchronized& operator=(synchronized const& o) {
  access([](T& lhs, T const& rhs){
    lhs = rhs;
  }, *this, o);
  return *this;
}
synchronized& operator=(synchronized && o) {
  access([](T& lhs, T&& rhs){
    lhs = std::move(rhs);
  }, *this, std::move(o));
  return *this;
}
friend void swap(synchronized& lhs, synchronized& rhs) {
  access([](T& lhs, T& rhs){
    using std::swap;
    swap(lhs, rhs);
  }, *this, o);
}

放置和对齐存储版本有点混乱。大多数对 t 的访问都将被成员函数 T&t()T const&t()const 取代,除非在构建过程中您必须跳过一些障碍。

通过使 synchronized 成为包装器而不是 class 的一部分,我们必须确保 class 在内部将 const 视为多重-reader,单线程写的

罕见的情况下,我们需要一个同步实例,我们像上面一样跳过箍。

对于以上任何错别字,我们深表歉意。应该有一些吧。

上述的一个附带好处是对 synchronized 对象(相同类型)的 n 元任意操作一起工作,而无需事先对其进行硬编码。添加友元声明,多种类型的 n-ary synchronized 对象可能一起工作。在这种情况下,我可能不得不 access 不再是内联好友以处理过载冲突。

live example