用 32 位原子实现 64 位原子计数器
Implementing 64 bit atomic counter with 32 bit atomics
我想从原子 uint32 拼凑出一个 uint64 原子计数器。计数器有一个 writer 和多个 readers。编写器是一个信号处理程序,因此它不能阻塞。
我的想法是使用低位的生成计数作为读锁。 reader 重试,直到生成计数在整个读取过程中稳定,并且低位未设置。
以下代码在内存排序的设计和使用上是否正确?有没有更好的方法?
using namespace std;
class counter {
atomic<uint32_t> lo_{};
atomic<uint32_t> hi_{};
atomic<uint32_t> gen_{};
uint64_t read() const {
auto acquire = memory_order_acquire;
uint32_t lo, hi, gen1, gen2;
do {
gen1 = gen_.load(acquire);
lo = lo_.load(acquire);
hi = hi_.load(acquire);
gen2 = gen_.load(acquire);
} while (gen1 != gen2 || (gen1 & 1));
return (uint64_t(hi) << 32) | lo;
}
void increment() {
auto release = memory_order_release;
gen_.fetch_add(1, release);
uint32_t newlo = 1 + lo_.fetch_add(1, release);
if (newlo == 0) {
hi_.fetch_add(1, release);
}
gen_.fetch_add(1, release);
}
};
编辑:糟糕,已修复 auto acquire = memory_order_release;
这是一种已知模式,称为 SeqLock。 https://en.wikipedia.org/wiki/Seqlock。 (简化后只有一个作者,因此不需要额外支持排除同时作者。)
您不需要也不希望计数器变量本身的增量使用原子 RMW 操作。您可以只用原子 32 位加载来加载两半,递增它,然后原子地存储结果。 (使用便宜的 relaxed
或 release
内存顺序,并使用 release
存储进行第二次计数器更新)。
同样,计数器也不需要是原子 RMW。
作者只需要纯加载和纯商店,只有发布顺序,这比原子 RMW 或 seq_cst 顺序 的商店便宜(很多):
- 以任意顺序加载计数器和值
- 存储一个新计数器(旧+1)
- 存储新值(或者如果你想在没有进位的情况下分支,则只更新低半部分)
- 存储最终计数器。
这 3 个要点中的商店顺序是唯一重要的事情。在第一个存储之后的写栅栏可能很好,因为我们真的不希望在 CPU 上对值 release
的两半进行 both 存储的成本比放松还贵
不幸的是,为了满足 C++ 规则,value
必须是 atomic<T>
,这使得让编译器生成最有效的代码来加载两半变得不方便。例如ARM ldp
/ stp
加载对可能不是原子的,但这并不重要。 (并且编译器通常不会将两个独立的原子 32 位加载优化为一个更宽的加载。)
序列计数器为奇数时其他线程读取的值无关紧要,但我们希望避免未定义的行为。也许我们可以使用 volatile uint64_t
和 atomic<uint64_t>
的并集
我为 写了这个 C++ SeqLock<class T>
模板,我没有写完答案(找出哪些版本的 ARM 有 64 位原子加载和存储)。
这会尝试检查目标是否已经支持 atomic<T>
上的无锁原子操作,以阻止您在它毫无意义时使用它。 (通过定义 IGNORE_SIZECHECK
来禁用它以进行测试。)
我为 T
提供了一个 inc()
函数,它支持 ++
运算符。 TODO 将是一个 apply()
,它接受一个 lambda 对 T
做一些事情,并在序列计数器更新之间存储结果。
// **UNTESTED**
#include <atomic>
#ifdef UNIPROCESSOR
// all readers and writers run on the same core
// ordering instructions at compile time is all that's necessary
#define ATOMIC_FENCE std::atomic_signal_fence
#else
// A reader can be running on another core while writing
// memory barriers or ARMv8 acquire / release loads / store are needed
#define ATOMIC_FENCE std::atomic_thread_fence
#endif
// using fences instead of .store(std::memory_order_release) will stop the compiler
// from taking advantage of a release-store instruction, like on AArch64 or x86
// SINGLE WRITER only.
// uses volatile + barriers for the data itself, like pre-C++11
template <class T>
class SeqLocked
{
#ifndef IGNORE_SIZECHECK
// sizeof(T) > sizeof(unsigned)
static_assert(!std::atomic<T>::is_always_lock_free, "A Seq Lock with a type small enough to be atomic on its own is totally pointless, and we don't have a specialization that replaces it with a straight wrapper for atomic<T>");
#endif
// C++17 doesn't have a good way to express a load that doesn't care about tearing
// without explicitly writing it as multiple small parts and thus gimping the compiler if it can use larger loads
volatile T data; // volatile should be fine on any implementation where pre-C++11 lockless code was possible with volatile,
// even though Data Race UB does apply to volatile variables in ISO C++11 and later.
std::atomic<unsigned> seqcount{0}; // Even means valid, odd means modification in progress.
// unsigned wraps around at a power of 2 on overflow
public:
T get() const {
unsigned c0, c1;
T tmp;
do {
c0 = seqcount.load(std::memory_order_relaxed); // or this can be a std::memory_order_acquire for multicore so AArch64 can use LDAR
ATOMIC_FENCE(std::memory_order_acquire);
tmp = (T)data; // load
ATOMIC_FENCE(std::memory_order_acquire); // LoadLoad barrier
c1 = seqcount.load(std::memory_order_relaxed);
} while(c0&1 || c0 != c1); // retry if the counter changed or is odd
return tmp;
}
// TODO: a version of this that takes a lambda for the operation on tmp
T inc() {
unsigned orig_count = seqcount.load(std::memory_order_relaxed);
seqcount.store(orig_count+1, std::memory_order_relaxed);
ATOMIC_FENCE(std::memory_order_release);
// make sure the data stores appear after the first counter update.
T tmp = data; // load
++tmp;
data = tmp; // store
ATOMIC_FENCE(std::memory_order_release);
seqcount.store(orig_count+2, std::memory_order_relaxed); // Or use mo_release here, better on AArch64
return tmp;
}
void set(T newval) {
unsigned orig_count = seqcount.load(std::memory_order_relaxed);
seqcount.store(orig_count+1, std::memory_order_relaxed);
ATOMIC_FENCE(std::memory_order_release);
// make sure the data stores appear after the first counter update.
data = newval; // store
ATOMIC_FENCE(std::memory_order_release);
seqcount.store(orig_count+2, std::memory_order_relaxed); // Or use mo_release here, better on AArch64
}
};
/***** test callers *******/
#include <stdint.h>
struct sixteenbyte {
//unsigned arr[4];
unsigned long a,b,c,d;
sixteenbyte() = default;
sixteenbyte(const volatile sixteenbyte &old)
: a(old.a), b(old.b), c(old.c), d(old.d) {}
//arr(old.arr) {}
};
void test_inc(SeqLocked<uint64_t> &obj) { obj.inc(); }
sixteenbyte test_get(SeqLocked<sixteenbyte> &obj) { return obj.get(); }
//void test_set(SeqLocked<sixteenbyte> &obj, sixteenbyte val) { obj.set(val); }
uint64_t test_get(SeqLocked<uint64_t> &obj) {
return obj.get();
}
// void atomic_inc_u64_seq_cst(std::atomic<uint64_t> &a) { ++a; }
uint64_t u64_inc_relaxed(std::atomic<uint64_t> &a) {
// same but without dmb barriers
return 1 + a.fetch_add(1, std::memory_order_relaxed);
}
uint64_t u64_load_relaxed(std::atomic<uint64_t> &a) {
// gcc uses LDREXD, not just LDRD?
return a.load(std::memory_order_relaxed);
}
void u64_store_relaxed(std::atomic<uint64_t> &a, uint64_t val) {
// gcc uses a LL/SC retry loop even for a pure store?
a.store(val, std::memory_order_relaxed);
}
它编译成我们想要的 asm on the Godbolt compiler explorer 用于 ARM 和其他 ISA。至少 int64_t;由于繁琐的 volatile
规则,较大的结构类型可能被复制的效率较低。
它使用非原子 volatile T data
作为共享数据。这在技术上是数据竞争未定义的行为,但我们在实践中使用的所有编译器都适用于 C++11 之前的多线程访问 volatile
对象。在 C++11 之前,人们甚至依赖于某些大小的原子性。我们做 而不是 ,我们检查计数器并仅在没有并发写入时使用我们读取的值。 (这就是 SeqLock 的全部意义所在。)
volatile T data
的一个问题是,在 ISO C++ 中,T foo = data
不会为结构对象编译,除非您提供来自 volatile
对象的复制构造函数,例如
sixteenbyte(const volatile sixteenbyte &old)
: a(old.a), b(old.b), c(old.c), d(old.d) {}
这对我们来说真的很烦人,因为我们不关心如何读取内存的细节,只是没有将多次读取优化为一次。
volatile
在这里确实是错误的工具,而简单的 T data
具有足够的防护以确保读取实际上发生在原子读取之间柜台会更好。例如我们可以在 GNU C 中使用 asm("":::"memory");
编译器屏障来防止重新排序 before/after 访问。这将使编译器使用 SIMD 向量或其他任何东西复制更大的对象,而单独的 volatile
访问不会这样做。
我认为 std::atomic_thread_fence(mo_acquire)
也是一个足够的障碍,但我不是 100% 确定。
在 ISO C 中,您可以复制一个 volatile
聚合(结构),编译器将发出它通常会复制那么多字节的任何 asm。但是在C++中,我们显然不能拥有美好的事物。
我想从原子 uint32 拼凑出一个 uint64 原子计数器。计数器有一个 writer 和多个 readers。编写器是一个信号处理程序,因此它不能阻塞。
我的想法是使用低位的生成计数作为读锁。 reader 重试,直到生成计数在整个读取过程中稳定,并且低位未设置。
以下代码在内存排序的设计和使用上是否正确?有没有更好的方法?
using namespace std;
class counter {
atomic<uint32_t> lo_{};
atomic<uint32_t> hi_{};
atomic<uint32_t> gen_{};
uint64_t read() const {
auto acquire = memory_order_acquire;
uint32_t lo, hi, gen1, gen2;
do {
gen1 = gen_.load(acquire);
lo = lo_.load(acquire);
hi = hi_.load(acquire);
gen2 = gen_.load(acquire);
} while (gen1 != gen2 || (gen1 & 1));
return (uint64_t(hi) << 32) | lo;
}
void increment() {
auto release = memory_order_release;
gen_.fetch_add(1, release);
uint32_t newlo = 1 + lo_.fetch_add(1, release);
if (newlo == 0) {
hi_.fetch_add(1, release);
}
gen_.fetch_add(1, release);
}
};
编辑:糟糕,已修复 auto acquire = memory_order_release;
这是一种已知模式,称为 SeqLock。 https://en.wikipedia.org/wiki/Seqlock。 (简化后只有一个作者,因此不需要额外支持排除同时作者。)
您不需要也不希望计数器变量本身的增量使用原子 RMW 操作。您可以只用原子 32 位加载来加载两半,递增它,然后原子地存储结果。 (使用便宜的 relaxed
或 release
内存顺序,并使用 release
存储进行第二次计数器更新)。
同样,计数器也不需要是原子 RMW。
作者只需要纯加载和纯商店,只有发布顺序,这比原子 RMW 或 seq_cst 顺序 的商店便宜(很多):
- 以任意顺序加载计数器和值
- 存储一个新计数器(旧+1)
- 存储新值(或者如果你想在没有进位的情况下分支,则只更新低半部分)
- 存储最终计数器。
这 3 个要点中的商店顺序是唯一重要的事情。在第一个存储之后的写栅栏可能很好,因为我们真的不希望在 CPU 上对值 release
的两半进行 both 存储的成本比放松还贵
不幸的是,为了满足 C++ 规则,value
必须是 atomic<T>
,这使得让编译器生成最有效的代码来加载两半变得不方便。例如ARM ldp
/ stp
加载对可能不是原子的,但这并不重要。 (并且编译器通常不会将两个独立的原子 32 位加载优化为一个更宽的加载。)
序列计数器为奇数时其他线程读取的值无关紧要,但我们希望避免未定义的行为。也许我们可以使用 volatile uint64_t
和 atomic<uint64_t>
我为 SeqLock<class T>
模板,我没有写完答案(找出哪些版本的 ARM 有 64 位原子加载和存储)。
这会尝试检查目标是否已经支持 atomic<T>
上的无锁原子操作,以阻止您在它毫无意义时使用它。 (通过定义 IGNORE_SIZECHECK
来禁用它以进行测试。)
我为 T
提供了一个 inc()
函数,它支持 ++
运算符。 TODO 将是一个 apply()
,它接受一个 lambda 对 T
做一些事情,并在序列计数器更新之间存储结果。
// **UNTESTED**
#include <atomic>
#ifdef UNIPROCESSOR
// all readers and writers run on the same core
// ordering instructions at compile time is all that's necessary
#define ATOMIC_FENCE std::atomic_signal_fence
#else
// A reader can be running on another core while writing
// memory barriers or ARMv8 acquire / release loads / store are needed
#define ATOMIC_FENCE std::atomic_thread_fence
#endif
// using fences instead of .store(std::memory_order_release) will stop the compiler
// from taking advantage of a release-store instruction, like on AArch64 or x86
// SINGLE WRITER only.
// uses volatile + barriers for the data itself, like pre-C++11
template <class T>
class SeqLocked
{
#ifndef IGNORE_SIZECHECK
// sizeof(T) > sizeof(unsigned)
static_assert(!std::atomic<T>::is_always_lock_free, "A Seq Lock with a type small enough to be atomic on its own is totally pointless, and we don't have a specialization that replaces it with a straight wrapper for atomic<T>");
#endif
// C++17 doesn't have a good way to express a load that doesn't care about tearing
// without explicitly writing it as multiple small parts and thus gimping the compiler if it can use larger loads
volatile T data; // volatile should be fine on any implementation where pre-C++11 lockless code was possible with volatile,
// even though Data Race UB does apply to volatile variables in ISO C++11 and later.
std::atomic<unsigned> seqcount{0}; // Even means valid, odd means modification in progress.
// unsigned wraps around at a power of 2 on overflow
public:
T get() const {
unsigned c0, c1;
T tmp;
do {
c0 = seqcount.load(std::memory_order_relaxed); // or this can be a std::memory_order_acquire for multicore so AArch64 can use LDAR
ATOMIC_FENCE(std::memory_order_acquire);
tmp = (T)data; // load
ATOMIC_FENCE(std::memory_order_acquire); // LoadLoad barrier
c1 = seqcount.load(std::memory_order_relaxed);
} while(c0&1 || c0 != c1); // retry if the counter changed or is odd
return tmp;
}
// TODO: a version of this that takes a lambda for the operation on tmp
T inc() {
unsigned orig_count = seqcount.load(std::memory_order_relaxed);
seqcount.store(orig_count+1, std::memory_order_relaxed);
ATOMIC_FENCE(std::memory_order_release);
// make sure the data stores appear after the first counter update.
T tmp = data; // load
++tmp;
data = tmp; // store
ATOMIC_FENCE(std::memory_order_release);
seqcount.store(orig_count+2, std::memory_order_relaxed); // Or use mo_release here, better on AArch64
return tmp;
}
void set(T newval) {
unsigned orig_count = seqcount.load(std::memory_order_relaxed);
seqcount.store(orig_count+1, std::memory_order_relaxed);
ATOMIC_FENCE(std::memory_order_release);
// make sure the data stores appear after the first counter update.
data = newval; // store
ATOMIC_FENCE(std::memory_order_release);
seqcount.store(orig_count+2, std::memory_order_relaxed); // Or use mo_release here, better on AArch64
}
};
/***** test callers *******/
#include <stdint.h>
struct sixteenbyte {
//unsigned arr[4];
unsigned long a,b,c,d;
sixteenbyte() = default;
sixteenbyte(const volatile sixteenbyte &old)
: a(old.a), b(old.b), c(old.c), d(old.d) {}
//arr(old.arr) {}
};
void test_inc(SeqLocked<uint64_t> &obj) { obj.inc(); }
sixteenbyte test_get(SeqLocked<sixteenbyte> &obj) { return obj.get(); }
//void test_set(SeqLocked<sixteenbyte> &obj, sixteenbyte val) { obj.set(val); }
uint64_t test_get(SeqLocked<uint64_t> &obj) {
return obj.get();
}
// void atomic_inc_u64_seq_cst(std::atomic<uint64_t> &a) { ++a; }
uint64_t u64_inc_relaxed(std::atomic<uint64_t> &a) {
// same but without dmb barriers
return 1 + a.fetch_add(1, std::memory_order_relaxed);
}
uint64_t u64_load_relaxed(std::atomic<uint64_t> &a) {
// gcc uses LDREXD, not just LDRD?
return a.load(std::memory_order_relaxed);
}
void u64_store_relaxed(std::atomic<uint64_t> &a, uint64_t val) {
// gcc uses a LL/SC retry loop even for a pure store?
a.store(val, std::memory_order_relaxed);
}
它编译成我们想要的 asm on the Godbolt compiler explorer 用于 ARM 和其他 ISA。至少 int64_t;由于繁琐的 volatile
规则,较大的结构类型可能被复制的效率较低。
它使用非原子 volatile T data
作为共享数据。这在技术上是数据竞争未定义的行为,但我们在实践中使用的所有编译器都适用于 C++11 之前的多线程访问 volatile
对象。在 C++11 之前,人们甚至依赖于某些大小的原子性。我们做 而不是 ,我们检查计数器并仅在没有并发写入时使用我们读取的值。 (这就是 SeqLock 的全部意义所在。)
volatile T data
的一个问题是,在 ISO C++ 中,T foo = data
不会为结构对象编译,除非您提供来自 volatile
对象的复制构造函数,例如
sixteenbyte(const volatile sixteenbyte &old)
: a(old.a), b(old.b), c(old.c), d(old.d) {}
这对我们来说真的很烦人,因为我们不关心如何读取内存的细节,只是没有将多次读取优化为一次。
volatile
在这里确实是错误的工具,而简单的 T data
具有足够的防护以确保读取实际上发生在原子读取之间柜台会更好。例如我们可以在 GNU C 中使用 asm("":::"memory");
编译器屏障来防止重新排序 before/after 访问。这将使编译器使用 SIMD 向量或其他任何东西复制更大的对象,而单独的 volatile
访问不会这样做。
我认为 std::atomic_thread_fence(mo_acquire)
也是一个足够的障碍,但我不是 100% 确定。
在 ISO C 中,您可以复制一个 volatile
聚合(结构),编译器将发出它通常会复制那么多字节的任何 asm。但是在C++中,我们显然不能拥有美好的事物。