用 32 位原子实现 64 位原子计数器

Implementing 64 bit atomic counter with 32 bit atomics

我想从原子 uint32 拼凑出一个 uint64 原子计数器。计数器有一个 writer 和多个 readers。编写器是一个信号处理程序,因此它不能阻塞。

我的想法是使用低位的生成计数作为读锁。 reader 重试,直到生成计数在整个读取过程中稳定,并且低位未设置。

以下代码在内存排序的设计和使用上是否正确?有没有更好的方法?

using namespace std;
class counter {
    atomic<uint32_t> lo_{};
    atomic<uint32_t> hi_{};
    atomic<uint32_t> gen_{};

    uint64_t read() const {
        auto acquire = memory_order_acquire;
        uint32_t lo, hi, gen1, gen2;
        do {
            gen1 = gen_.load(acquire);
            lo = lo_.load(acquire);
            hi = hi_.load(acquire);
            gen2 = gen_.load(acquire);
        } while (gen1 != gen2 || (gen1 & 1));
        return (uint64_t(hi) << 32) | lo;
    }

    void increment() {
        auto release = memory_order_release;
        gen_.fetch_add(1, release);
        uint32_t newlo = 1 + lo_.fetch_add(1, release);
        if (newlo == 0) {
            hi_.fetch_add(1, release);
        }
        gen_.fetch_add(1, release);
    }
};

编辑:糟糕,已修复 auto acquire = memory_order_release;

这是一种已知模式,称为 SeqLock。 https://en.wikipedia.org/wiki/Seqlock。 (简化后只有一个作者,因此不需要额外支持排除同时作者。)

您不需要也不希望计数器变量本身的增量使用原子 RMW 操作。您可以只用原子 32 位加载来加载两半,递增它,然后原子地存储结果。 (使用便宜的 relaxedrelease 内存顺序,并使用 release 存储进行第二次计数器更新)。

同样,计数器也不需要是原子 RMW。

作者只需要纯加载和纯商店,只有发布顺序,这比原子 RMW 或 seq_cst 顺序 的商店便宜(很多):

  • 以任意顺序加载计数器和值
  • 存储一个新计数器(旧+1)
  • 存储新值(或者如果你想在没有进位的情况下分支,则只更新低半部分)
  • 存储最终计数器。

这 3 个要点中的商店顺序是唯一重要的事情。在第一个存储之后的写栅栏可能很好,因为我们真的不希望在 CPU 上对值 release 的两半进行 both 存储的成本比放松还贵


不幸的是,为了满足 C++ 规则,value 必须是 atomic<T>,这使得让编译器生成最有效的代码来加载两半变得不方便。例如ARM ldp / stp 加载对可能不是原子的,但这并不重要。 (并且编译器通常不会将两个独立的原子 32 位加载优化为一个更宽的加载。)

序列计数器为奇数时其他线程读取的值无关紧要,但我们希望避免未定义的行为。也许我们可以使用 volatile uint64_tatomic<uint64_t>

的并集

我为 写了这个 C++ SeqLock<class T> 模板,我没有写完答案(找出哪些版本的 ARM 有 64 位原子加载和存储)。

这会尝试检查目标是否已经支持 atomic<T> 上的无锁原子操作,以阻止您在它毫无意义时使用它。 (通过定义 IGNORE_SIZECHECK 来禁用它以进行测试。)

我为 T 提供了一个 inc() 函数,它支持 ++ 运算符。 TODO 将是一个 apply(),它接受一个 lambda 对 T 做一些事情,并在序列计数器更新之间存储结果。

// **UNTESTED**

#include <atomic>

#ifdef UNIPROCESSOR
// all readers and writers run on the same core
// ordering instructions at compile time is all that's necessary
#define ATOMIC_FENCE std::atomic_signal_fence
#else
// A reader can be running on another core while writing
// memory barriers or ARMv8 acquire / release loads / store are needed
#define ATOMIC_FENCE std::atomic_thread_fence
#endif
// using fences instead of .store(std::memory_order_release) will stop the compiler
// from taking advantage of a release-store instruction, like on AArch64 or x86


// SINGLE WRITER only.
// uses volatile + barriers for the data itself, like pre-C++11
template <class T>
class SeqLocked
{
#ifndef IGNORE_SIZECHECK
    // sizeof(T) > sizeof(unsigned)
    static_assert(!std::atomic<T>::is_always_lock_free, "A Seq Lock with a type small enough to be atomic on its own is totally pointless, and we don't have a specialization that replaces it with a straight wrapper for atomic<T>");
#endif

       // C++17 doesn't have a good way to express a load that doesn't care about tearing
       //  without explicitly writing it as multiple small parts and thus gimping the compiler if it can use larger loads
    volatile T data;          // volatile should be fine on any implementation where pre-C++11 lockless code was possible with volatile,
                              //  even though Data Race UB does apply to volatile variables in ISO C++11 and later.

    std::atomic<unsigned> seqcount{0};  // Even means valid, odd means modification in progress.
                                        //  unsigned wraps around at a power of 2 on overflow

public:
    T get() const {
        unsigned c0, c1;
        T tmp;

        do {
            c0 = seqcount.load(std::memory_order_relaxed);  // or this can be a std::memory_order_acquire for multicore so AArch64 can use LDAR
            ATOMIC_FENCE(std::memory_order_acquire);

            tmp = (T)data;       // load

            ATOMIC_FENCE(std::memory_order_acquire);  // LoadLoad barrier
            c1 = seqcount.load(std::memory_order_relaxed);
        } while(c0&1 || c0 != c1);     // retry if the counter changed or is odd

        return tmp;
    }

    // TODO: a version of this that takes a lambda for the operation on tmp
    T inc() {
        unsigned orig_count = seqcount.load(std::memory_order_relaxed);

        seqcount.store(orig_count+1, std::memory_order_relaxed);
        ATOMIC_FENCE(std::memory_order_release);
        // make sure the data stores appear after the first counter update.

        T tmp = data;  // load
        ++tmp;
        data = tmp;    // store

        ATOMIC_FENCE(std::memory_order_release);
        seqcount.store(orig_count+2, std::memory_order_relaxed);  // Or use mo_release here, better on AArch64

        return tmp;
    }

    void set(T newval) {
        unsigned orig_count = seqcount.load(std::memory_order_relaxed);

        seqcount.store(orig_count+1, std::memory_order_relaxed);
        ATOMIC_FENCE(std::memory_order_release);
        // make sure the data stores appear after the first counter update.

        data = newval;    // store

        ATOMIC_FENCE(std::memory_order_release);
        seqcount.store(orig_count+2, std::memory_order_relaxed);  // Or use mo_release here, better on AArch64
    }

};


/***** test callers *******/
#include <stdint.h>

struct sixteenbyte {
    //unsigned arr[4];
    unsigned long  a,b,c,d;
    sixteenbyte() = default;
    sixteenbyte(const volatile sixteenbyte &old)
         : a(old.a), b(old.b), c(old.c), d(old.d) {}
    //arr(old.arr) {}
};

void test_inc(SeqLocked<uint64_t> &obj) {  obj.inc(); }
sixteenbyte test_get(SeqLocked<sixteenbyte> &obj) { return obj.get(); }
//void test_set(SeqLocked<sixteenbyte> &obj, sixteenbyte val) { obj.set(val); }

uint64_t test_get(SeqLocked<uint64_t> &obj) {
    return obj.get();
}

// void atomic_inc_u64_seq_cst(std::atomic<uint64_t> &a) { ++a; }
uint64_t u64_inc_relaxed(std::atomic<uint64_t> &a) {
    // same but without dmb barriers
    return 1 + a.fetch_add(1, std::memory_order_relaxed);
}

uint64_t u64_load_relaxed(std::atomic<uint64_t> &a) {
    // gcc uses LDREXD, not just LDRD?
    return a.load(std::memory_order_relaxed);
}

void u64_store_relaxed(std::atomic<uint64_t> &a, uint64_t val) {
    // gcc uses a LL/SC retry loop even for a pure store?
    a.store(val, std::memory_order_relaxed);
}

它编译成我们想要的 asm on the Godbolt compiler explorer 用于 ARM 和其他 ISA。至少 int64_t;由于繁琐的 volatile 规则,较大的结构类型可能被复制的效率较低。

它使用非原子 volatile T data 作为共享数据。这在技术上是数据竞争未定义的行为,但我们在实践中使用的所有编译器都适用于 C++11 之前的多线程访问 volatile 对象。在 C++11 之前,人们甚至依赖于某些大小的原子性。我们做 而不是 ,我们检查计数器并仅在没有并发写入时使用我们读取的值。 (这就是 SeqLock 的全部意义所在。)

volatile T data 的一个问题是,在 ISO C++ 中,T foo = data 不会为结构对象编译,除非您提供来自 volatile 对象的复制构造函数,例如

sixteenbyte(const volatile sixteenbyte &old)
         : a(old.a), b(old.b), c(old.c), d(old.d) {}

这对我们来说真的很烦人,因为我们不关心如何读取内存的细节,只是没有将多次读取优化为一次。

volatile 在这里确实是错误的工具,而简单的 T data 具有足够的防护以确保读取实际上发生在原子读取之间柜台会更好。例如我们可以在 GNU C 中使用 asm("":::"memory"); 编译器屏障来防止重新排序 before/after 访问。这将使编译器使用 SIMD 向量或其他任何东西复制更大的对象,而单独的 volatile 访问不会这样做。

我认为 std::atomic_thread_fence(mo_acquire) 也是一个足够的障碍,但我不是 100% 确定。


在 ISO C 中,您可以复制一个 volatile 聚合(结构),编译器将发出它通常会复制那么多字节的任何 asm。但是在C++中,我们显然不能拥有美好的事物。