无锁同步、栅栏和内存顺序(具有获取语义的存储操作)

lock-free synchronization, fences and memory order (store operation with acquire semantics)

我正在将 运行 的准系统项目迁移到 linux,并且需要消除一些 {disable,enable}_scheduler 调用。 :)

所以我需要一个单写多读场景下的无锁同步解决方案,写线程不能被阻塞。我提出了以下解决方案,它不适合通常的获取-发布顺序:

class RWSync {
    std::atomic<int> version; // incremented after every modification
    std::atomic_bool invalid; // true during write
public:
  RWSync() : version(0), invalid(0) {}
  template<typename F> void sync(F lambda) {
    int currentVersion;
    do {
      do { // wait until the object is valid
        currentVersion = version.load(std::memory_order_acquire);
      } while (invalid.load(std::memory_order_acquire));
      lambda();
      std::atomic_thread_fence(std::memory_order_seq_cst);
      // check if something changed
    } while (version.load(std::memory_order_acquire) != currentVersion
        || invalid.load(std::memory_order_acquire));
  }
  void beginWrite() {
    invalid.store(true, std::memory_order_relaxed);
    std::atomic_thread_fence(std::memory_order_seq_cst);
  }
  void endWrite() {
    std::atomic_thread_fence(std::memory_order_seq_cst);
    version.fetch_add(1, std::memory_order_release);
    invalid.store(false, std::memory_order_release);
  }
}

我希望意图很明确:我将对(非原子)有效载荷的修改包装在 beginWrite/endWrite 之间,并仅在传递给 sync() 的 lambda 函数中读取有效载荷。

如您所见,这里我在 beginWrite() 中有一个原子 store,其中存储操作之后的写入不能在存储之前重新排序。没有找到合适的例子,我也没有这方面的经验,所以想确认一下是可以的(通过测试验证也不容易)。

  1. 此代码是否无竞争且按我预期的方式工作?

  2. 如果我在每个原子操作中都使用std::memory_order_seq_cst,我可以省略栅栏吗? (即使是,我估计性能会更差)

  3. 我可以在 endWrite() 中删除围栏吗?

  4. 我可以在围栏中使用 memory_order_acq_rel 吗?我不太明白其中的区别——单一总订单概念对我来说不是很清楚。

  5. 有没有简化/优化的机会?

+1。我很乐意接受任何更好的想法作为这个 class :)

代码基本正确

您可以使用 单个 version 具有语义的 "Odd values are invalid" 变量,而不是两个原子变量(versioninvalid) ].这被称为 "sequential lock" 机制。

减少原子变量的数量可以大大简化事情:

class RWSync {
    // Incremented before and after every modification.
    // Odd values mean that object in invalid state.
    std::atomic<int> version; 
public:
  RWSync() : version(0) {}
  template<typename F> void sync(F lambda) {
    int currentVersion;
    do {
      currentVersion = version.load(std::memory_order_seq_cst);
      // This may reduce calls to lambda(), nothing more
      if(currentVersion | 1) continue;

      lambda();

      // Repeat until something changed or object is in an invalid state.
    } while ((currentVersion | 1) ||
        version.load(std::memory_order_seq_cst) != currentVersion));
  }
  void beginWrite() {
    // Writer may read version with relaxed memory order
    currentVersion = version.load(std::memory_order_relaxed);
    // Invalidation requires sequential order
    version.store(currentVersion + 1, std::memory_order_seq_cst);
  }
  void endWrite() {
    // Writer may read version with relaxed memory order
    currentVersion = version.load(std::memory_order_relaxed);
    // Release order is sufficient for mark an object as valid
    version.store(currentVersion + 1, std::memory_order_release);
  }
};

注意beginWrite()endWrite()中内存顺序的区别:

  • endWrite() 确保所有 previous 对象的修改已经完成。为此使用 release 内存顺序就足够了。

  • beginWrite() 确保 reader 将在任何 futer 对象的修改开始之前检测到对象处于无效状态。这样的保证需要seq_cst内存顺序。因为 reader 也使用 seq_cst 内存顺序。

至于栅栏,最好将它们合并到previous/futher原子操作中:编译器知道如何快速获得结果。


原代码部分修改说明:

1) fetch_add()这样的原子修改是针对这样的情况,当并发修改(像另一个fetch_add()) 是可能的。为了正确起见,此类修改使用 内存锁定 或其他 非常耗时 特定于体系结构的东西。

原子赋值store())不使用内存锁定,所以比fetch_add()便宜 .您可以使用这样的分配,因为在您的情况下不可能进行并发修改(reader 不会修改 version)。

2) 不同于 release-acquire 语义,它区分 loadstore 操作,顺序一致性 (memory_order_seq_cst) 适用对每个原子访问,并提供这些访问之间的总顺序。

接受的答案不正确。我想代码应该是 "currentVersion & 1" 而不是 "currentVersion | 1"。更微妙的错误是,reader 线程可以进入 lambda(),然后写入线程可以 运行beginWrite() 并将值写入非原子变量。在这种情况下,payload 中的 write action 和 payload 中的 read action 没有 happens-before 关系。对非原子变量的并发访问(没有发生前关系)是一种数据竞争。需要注意的是,memory_order_seq_cst的单个全序并不代表happens-before关系;它们是一致的,但是有两种东西。