如何使用 C++11 CAS 实现 ABA 计数器?

How can I implement ABA counter with c++11 CAS?

我正在基于这个algorithm实现一个无锁队列,它使用计数器来解决ABA问题。但我不知道如何用 c++11 CAS 实现这个计数器。例如,从算法:

E9:    if CAS(&tail.ptr->next, next, <node, next.count+1>)

是原子操作,意思是如果tail.ptr->next等于next,让tail.ptr->next同时(原子地)指向node 使 next.count+1。然而,使用C++11 CAS,我只能实现:

std::atomic_compare_exchange_weak(&tail.ptr->next, next, node);

不能使next.count+1同时发生。

要通过单个原子操作一次原子地修改两个事物,您需要将它们放在相邻的内存中,例如在两个成员的结构中。然后,您可以使用 std::atomic<my_struct> 让 gcc 在 x86-64 上发出 lock cmpxchg16b,例如。

为此您不需要内联 asm,为了避免它,花点 C++ 语法的痛苦是值得的。 https://gcc.gnu.org/wiki/DontUseInlineAsm.

不幸的是,对于当前的编译器,您需要使用 union 来获得仅读取其中一个的高效代码。对结构进行原子加载然后仅使用一个成员的“明显”方式仍然导致 lock cmpxchg16b 读取整个结构,即使我们只需要一个成员。 (慢得多,并且会弄脏缓存行,因此读者会与其他读者竞争)。我相信指针的正常 64b 加载仍然会在 x86 上正确实现获取内存排序语义(以及原子性),但当前的编译器甚至不会对 std::memory_order_relaxed 进行优化,所以我们用工会把他们骗进去。

(已提交 GCC bug 80835 关于此内容。TODO:如果这是一个有用的想法,则对 clang 也是如此。)


清单:

  • 确保您的编译器生成有效代码以在只读情况下仅加载一个成员,而不是 lock cmpxchg16b 对。例如使用联合。

  • 确保您的编译器保证在编写不同的联合成员后访问联合的一个成员在该实现中具有明确定义的行为。 Union 类型双关在 C99 中是合法的(因此这应该适用于 C11 stdatomic),但它在 ISO C++11 中是 UB。但是,它在 C++ 的 GNU 方言中是合法的(由 gcc、clang 和 ICC 等支持)。

  • 确保您的对象是 16B 对齐的,或者对于 32 位指针是 8B 对齐的。更一般地说,alignas(2*sizeof(void*)) 应该有效。未对齐的 locked 指令在 x86 上可能 非常 慢,尤其是当它们跨越缓存行边界时。如果对象未对齐,clang3.8 甚至会将其编译为库调用。

  • 使用 -mcx16 编译 x86-64 构建。 cmpxchg16b 不受最早的 x86-64 CPU (AMD K8) 支持,但之后的所有东西都应该支持。如果没有 -mcx16,您会得到一个库函数调用(可能使用全局锁)。 32 位等价物 cmpxchg8b 已经足够古老,现代编译器假定支持它。 (并且可以使用 SSE、MMX 甚至 x87 来执行 64b 原子 loads/stores,因此在读取一个成员时使用联合对于良好的性能来说不太重要)。

  • 确保指针+uintptr_t原子对象是无锁的。这对于 x32 和 32 位 ABI(8B 对象)几乎是有保证的,但对于 16B 对象则不然。例如MSVC 使用 x86-64 的锁。

    gcc7 及更高版本将调用 libatomic 而不是内联 lock cmpxchg16b,并且将从 atomic_is_lock_free (for reasons including that it's so slow it's not what users expect is_lock_free to mean) return false,但至少现在是 libatomic 实现仍然在该指令可用的目标上使用 lock cmpxchg16b。 (它甚至可以为只读原子对象发生段错误,所以它确实不理想。更重要的是,读者与其他读者争夺对缓存行的独占访问权。这就是为什么我们要这么麻烦地避免 lock cmpxchg16b 当我们只想要一个 8 字节的一半时这里的读取端。)


这是一个带有 CAS 重试循环的代码示例,它编译成看起来正确的 asm,我认为没有 UB 或其他不安全的 C++ 实现允许联合类型双关。它是用 C 风格编写的(非成员函数等),但是如果您编写成员函数,它会是一样的。

查看带有 asm 输出的代码 from gcc6.3 on the Godbolt compiler explorer。对于 -m32,它使用 cmpxchg8b 的方式与 64 位代码使用 cmpxchg16b 的方式相同。使用 -mx32(长模式下的 32 位指针),它可以简单地使用 64 位 cmpxchg 和正常的 64 位整数加载来在一次原子加载中获取两个成员。

这是可移植的 C++11(联合类型双关除外),没有任何特定于 x86 的内容。不过,它仅在可以 CAS 两个指针大小的对象的目标上 有效 例如它编译为对 ARM / ARM64 和 MIPS64 的 __atomic_compare_exchange_16 库函数的调用,正如您在 Godbolt 上看到的那样。

它不能在 MSVC 上编译,其中 atomic<counted_ptr> 大于 counted_ptr_separate,因此 static_assert 会捕获它。大概 MSVC 在原子对象中包含一个锁成员。

#include <atomic>
#include <stdint.h>

using namespace std;

struct node {
  // This alignas is essential for clang to use cmpxchg16b instead of a function call
  // Apparently just having it on the union member isn't enough.
  struct alignas(2*sizeof(node*)) counted_ptr {
    node *    ptr;
    uintptr_t count;  // use pointer-sized integers to avoid padding
  };
  
  // hack to allow reading just the pointer without lock-cmpxchg16b,
  // but still without any C++ data race
  struct counted_ptr_separate {
    atomic<node *>    ptr;
    atomic<uintptr_t> count_separate;  // var name emphasizes that accessing this way isn't atomic with ptr
  };

  static_assert(sizeof(atomic<counted_ptr>) == sizeof(counted_ptr_separate), "atomic<counted_ptr> isn't the same size as the separate version; union type-punning will be bogus");
  //static_assert(std::atomic<counted_ptr>{}.is_lock_free());
  
  union {  // anonymous union: the members are directly part of struct node
    alignas(2*sizeof(node*)) atomic<counted_ptr> next_and_count;
    counted_ptr_separate  next;
  };
  // TODO: write member functions to read next.ptr or read/write next_and_count

  int data[4];
};


// make sure read-only access is efficient.
node *follow(node *p) {   // good asm, just a mov load
  return p->next.ptr.load(memory_order_acquire);
}
node *follow_nounion(node *p) {  // really bad asm, using cmpxchg16b to load the whole thing
  return p->next_and_count.load(memory_order_acquire).ptr;
}


void update_next(node &target, node *desired)
{
  // read the old value efficiently to avoid overhead for the no-contention case
  // tearing (or stale data from a relaxed load) will just lead to a retry
  node::counted_ptr expected = {
      target.next.ptr.load(memory_order_relaxed),
      target.next.count_separate.load(memory_order_relaxed) };

  bool success;
  do {
    node::counted_ptr newval = { desired, expected.count + 1 };
    // x86-64: compiles to cmpxchg16b
    success = target.next_and_count.compare_exchange_weak(
                           expected, newval, memory_order_acq_rel);
    // updates exected on failure
  } while( !success );
}

clang 4.0 -O3 -mcx16 的 asm 输出是:

update_next(node&, node*):
    push    rbx             # cmpxchg16b uses rbx implicitly so it has to be saved/restored
    mov     rbx, rsi
    mov     rax, qword ptr [rdi]     # load the pointer
    mov     rdx, qword ptr [rdi + 8] # load the counter
.LBB2_1:                        # =>This Inner Loop Header: Depth=1
    lea     rcx, [rdx + 1]
    lock
    cmpxchg16b      xmmword ptr [rdi]
    jne     .LBB2_1
    pop     rbx
    ret

gcc 做了一些笨拙的事情 store/reloads,但基本上是相同的逻辑。

follow(node*) 编译为 mov rax, [rdi] / ret,因此对指针的只读访问是应该的,多亏了 union hack。


它确实依赖于通过一个成员编写一个联合并通过另一个成员读取它,从而在不使用 lock cmpxchg16b 的情况下仅对指针进行高效读取。这保证可以在 GNU C++(和 ISO C99/C11)中工作,但不能在 ISO C++ 中工作。许多其他 C++ 编译器确实保证联合类型双关有效,但即使没有它,它也可能仍然有效:我们总是使用 std::atomic 加载,它必须假设值是异步修改的。所以我们应该避免类似别名的问题,在通过另一个指针(或联合成员)写入值后,寄存器中的值仍然被认为是有效的。不过,编译器认为独立的事物在编译时重新排序可能是个问题。

仅在指针+计数器的原子 cmpxchg 之后以原子方式读取指针仍应为您提供 x86 上的 acquire/release 语义,但我不认为 ISO C++ 对此有任何说明。 我猜想一个广泛的发布存储(作为 compare_exchange_weak 的一部分将与大多数架构上相同地址的较窄负载同步(就像在 x86 上一样),但是 AFAIK C++ std::atomic 不保证任何类型双关。

与指针 + ABA 计数器无关,但可能会出现在其他使用联合来允许访问较大原子对象的子集的应用程序中:不要使用联合来允许原子存储只是指针或只是计数器。至少如果您关心与一对获取负载的同步,则不会。即使是强序 x86 也可以 。一切仍然是原子的,但就内存排序而言,你进入了一个奇怪的领域。

在 x86-64 上,原子 16B 加载需要 lock cmpxchg16b(这是一个完整的内存屏障,防止前面的窄存储在它之后变得全局可见)。但是,如果将其与 32 位指针(或 32 位数组索引)一起使用,则很容易出现问题,因为两半都可以使用常规 64b 加载进行加载。如果您需要与其他线程同步而不仅仅是原子性,我不知道您会在其他架构上看到什么问题。


要了解有关 std::memory_order 获取和释放 的更多信息,请参阅 Jeff Preshing's excellent articles