原子值的部分比较和完全交换
Partial-compare-and-full-swap for atomic values
问题如下
给定一个包含两部分的 POD 对象:index 和 data。我想对其执行原子条件交换操作,条件是仅检查索引是否相等。
像这样:
struct Data { size_t m_index; char m_data; };
std::atomic<Data> dd; // some initialization
Data zz; // some initialization
// so I want something like this
dd.exchange_if_equals<&Data::m_index>(10,zz);
所以这是一种“部分-比较-完整-交换”操作。也许这将需要 Data::m_index
.
的适当对齐
显然std::atomic
不支持这个,但是是否可以自己实现这个,或者也许有另一个库支持这个?
我想你必须做一个加载,然后是你的自定义条件,然后是比较和交换,比较是当前值完全等于读取值。如果最后一步失败,则循环。
template<class T, class F>
bool swap_if(std::atomic<T>& atomic, T desired, F&& condition) {
for (;;) {
T data = atomic.load();
if (!condition(data)) break;
if (atomic.compare_exchange_weak(data, desired)) return true;
}
return false;
}
http://coliru.stacked-crooked.com/a/a394e336628246a9
由于复杂性,您可能应该只使用互斥锁。
另外,std::atomic<Data>
可能已经在幕后使用了互斥锁,因为 Data
太大了。
如果使用 std::mutex
而不是 atomic
是一个选项,您可以将互斥量放入您自己的类似原子的包装器中。
这是它可能的样子的开始:
#include <iostream>
#include <type_traits>
#include <mutex>
template<typename T>
class myatomic {
public:
static_assert(
// std::is_trivially_copyable_v<T> && // used in std::atomic, not needed here
std::is_copy_constructible_v<T> &&
std::is_move_constructible_v<T> &&
std::is_copy_assignable_v<T> &&
std::is_move_assignable_v<T>, "unsupported type");
using value_type = T;
myatomic() : data{} {}
explicit myatomic(const T& v) : data{v} {}
myatomic(const myatomic& rhs) : myatomic(rhs.load()) {}
myatomic& operator=(const myatomic& rhs) {
std::scoped_lock lock(mtx, rhs.mtx);
data = rhs.data;
return *this;
}
T load() const {
const std::lock_guard<std::mutex> lock(mtx);
return data;
}
operator T() const {
return load();
}
void store(const T& v) {
const std::lock_guard<std::mutex> lock(mtx);
data = v;
}
myatomic& operator=(const T& v) {
store(v);
return *this;
}
// partial compare and full swap
template<typename Mptr, typename V>
bool exchange_if_equals(Mptr mvar, V mval, const T& oval) {
const std::lock_guard<std::mutex> lock(mtx);
if(data.*mvar == mval) {
data = oval;
return true;
}
return false;
}
template<typename Mptr>
auto get(Mptr mvar) const {
const std::lock_guard<std::mutex> lock(mtx);
return data.*mvar;
}
template<typename Mptr, typename V>
void set(Mptr mvar, const V& v) {
const std::lock_guard<std::mutex> lock(mtx);
data.*mvar = v;
}
private:
mutable std::mutex mtx;
T data;
};
struct Data {
size_t m_index;
char m_data;
};
int main() {
Data orig{10, 'a'};
Data zz; // some initialization
myatomic<Data> dd(orig);
dd.exchange_if_equals(&Data::m_index, 10U, zz);
std::cout << dd.get(&Data::m_index);
}
就像 C++ 一样,硬件 CAS(如 x86-64 或 ARMv8.1)在 asm 中不支持此功能,您必须自己动手。
在 C++ 中,这相当简单:加载原始值并替换其中的一部分。如果另一个核心更改了您不想与之比较的其他部分,这当然会引入虚假故障。
如果可能,使用 unsigned m_index
而不是 size_t
,这样整个结构在典型的 64 位机器上可以容纳 8 个字节,而不是 16 个. 16 字节原子在 x86-64 上速度较慢(尤其是纯负载部分),或者在某些实现 and/or 某些 ISA 上什至根本不是无锁的。请参阅 re: x86-64 lock cmpgxchg16b
与当前 GCC/clang.
如果每个 atomic<>
访问单独获取一个锁,那么在整个自定义比较和设置周围使用一个互斥量会好得多。
我写了一个 CAS 尝试(如 cas_weak
)的简单实现作为示例。您可以在 std::atomic<Data>
的模板特化或派生 class 中使用它来为 atomic<Data>
对象提供新的成员函数。
#include <atomic>
struct Data {
// without alignment, clang's atomic<Data> doesn't inline load + CAS?!? even though return d.is_always_lock_free; is true
alignas(long long) char m_data;
unsigned m_index; // this last so compilers can replace it slightly more efficiently
};
inline bool partial_cas_weak(std::atomic<Data> &d, unsigned expected_idx, Data zz, std::memory_order order = std::memory_order_seq_cst)
{
Data expected = d.load(std::memory_order_relaxed);
expected.m_index = expected_idx; // new index, same everything else
return d.compare_exchange_weak(expected, zz, order);
// updated value of "expected" discarded on CAS failure
// If you make this a retry loop, use it instead of repeated d.load
}
这在 x86-64 (Godbolt) 的实践中编译得很好,内联到传递编译时间常量的调用者中 order
(否则 clang 会在该 order
arg 上疯狂分支以获得函数的独立非内联版本)
# clang10.0 -O3 for x86-64
test_pcw(std::atomic<Data>&, unsigned int, Data):
mov rax, qword ptr [rdi] # load the whole thing
shl rsi, 32
mov eax, eax # zero-extend the low 32 bits, clearing m_index
or rax, rsi # OR in a new high half = expected_idx
lock cmpxchg qword ptr [rdi], rdx # the actual 8-byte CAS
sete al # boolean FLAG result into register
ret
不幸的是,编译器太笨了,无法只加载他们实际需要的原子结构部分,而是加载整个部分,然后将他们不需要的部分归零。 (请参阅 了解在某些编译器上解决此问题的联合技巧。)
不幸的是,GCC 使 stores/reloads 临时堆栈的 asm 变得混乱,导致存储转发停顿。 GCC 还会将 char m_data
之后的填充置零(无论它是第一个还是最后一个成员),如果内存中的实际对象具有非零填充,则可能导致 CAS 总是失败。如果纯存储和初始化总是使它为零,那可能是不可能的。
像 ARM 或 PowerPC 这样的 LL/SC machine 可以在汇编中轻松地做到这一点(compare/branch 是手动完成的,在加载链接和存储条件之间),但是没有库公开那个便携。 (最重要的是因为它不能为像 x86 这样的机器编译,并且因为你在 LL/SC 事务中可以做的事情受到严重限制,本地变量的调试模式 spill/reload 可能导致代码总是失败.)
问题如下
给定一个包含两部分的 POD 对象:index 和 data。我想对其执行原子条件交换操作,条件是仅检查索引是否相等。
像这样:
struct Data { size_t m_index; char m_data; };
std::atomic<Data> dd; // some initialization
Data zz; // some initialization
// so I want something like this
dd.exchange_if_equals<&Data::m_index>(10,zz);
所以这是一种“部分-比较-完整-交换”操作。也许这将需要 Data::m_index
.
显然std::atomic
不支持这个,但是是否可以自己实现这个,或者也许有另一个库支持这个?
我想你必须做一个加载,然后是你的自定义条件,然后是比较和交换,比较是当前值完全等于读取值。如果最后一步失败,则循环。
template<class T, class F>
bool swap_if(std::atomic<T>& atomic, T desired, F&& condition) {
for (;;) {
T data = atomic.load();
if (!condition(data)) break;
if (atomic.compare_exchange_weak(data, desired)) return true;
}
return false;
}
http://coliru.stacked-crooked.com/a/a394e336628246a9
由于复杂性,您可能应该只使用互斥锁。
另外,std::atomic<Data>
可能已经在幕后使用了互斥锁,因为 Data
太大了。
如果使用 std::mutex
而不是 atomic
是一个选项,您可以将互斥量放入您自己的类似原子的包装器中。
这是它可能的样子的开始:
#include <iostream>
#include <type_traits>
#include <mutex>
template<typename T>
class myatomic {
public:
static_assert(
// std::is_trivially_copyable_v<T> && // used in std::atomic, not needed here
std::is_copy_constructible_v<T> &&
std::is_move_constructible_v<T> &&
std::is_copy_assignable_v<T> &&
std::is_move_assignable_v<T>, "unsupported type");
using value_type = T;
myatomic() : data{} {}
explicit myatomic(const T& v) : data{v} {}
myatomic(const myatomic& rhs) : myatomic(rhs.load()) {}
myatomic& operator=(const myatomic& rhs) {
std::scoped_lock lock(mtx, rhs.mtx);
data = rhs.data;
return *this;
}
T load() const {
const std::lock_guard<std::mutex> lock(mtx);
return data;
}
operator T() const {
return load();
}
void store(const T& v) {
const std::lock_guard<std::mutex> lock(mtx);
data = v;
}
myatomic& operator=(const T& v) {
store(v);
return *this;
}
// partial compare and full swap
template<typename Mptr, typename V>
bool exchange_if_equals(Mptr mvar, V mval, const T& oval) {
const std::lock_guard<std::mutex> lock(mtx);
if(data.*mvar == mval) {
data = oval;
return true;
}
return false;
}
template<typename Mptr>
auto get(Mptr mvar) const {
const std::lock_guard<std::mutex> lock(mtx);
return data.*mvar;
}
template<typename Mptr, typename V>
void set(Mptr mvar, const V& v) {
const std::lock_guard<std::mutex> lock(mtx);
data.*mvar = v;
}
private:
mutable std::mutex mtx;
T data;
};
struct Data {
size_t m_index;
char m_data;
};
int main() {
Data orig{10, 'a'};
Data zz; // some initialization
myatomic<Data> dd(orig);
dd.exchange_if_equals(&Data::m_index, 10U, zz);
std::cout << dd.get(&Data::m_index);
}
就像 C++ 一样,硬件 CAS(如 x86-64 或 ARMv8.1)在 asm 中不支持此功能,您必须自己动手。
在 C++ 中,这相当简单:加载原始值并替换其中的一部分。如果另一个核心更改了您不想与之比较的其他部分,这当然会引入虚假故障。
如果可能,使用 unsigned m_index
而不是 size_t
,这样整个结构在典型的 64 位机器上可以容纳 8 个字节,而不是 16 个. 16 字节原子在 x86-64 上速度较慢(尤其是纯负载部分),或者在某些实现 and/or 某些 ISA 上什至根本不是无锁的。请参阅 lock cmpgxchg16b
与当前 GCC/clang.
如果每个 atomic<>
访问单独获取一个锁,那么在整个自定义比较和设置周围使用一个互斥量会好得多。
我写了一个 CAS 尝试(如 cas_weak
)的简单实现作为示例。您可以在 std::atomic<Data>
的模板特化或派生 class 中使用它来为 atomic<Data>
对象提供新的成员函数。
#include <atomic>
struct Data {
// without alignment, clang's atomic<Data> doesn't inline load + CAS?!? even though return d.is_always_lock_free; is true
alignas(long long) char m_data;
unsigned m_index; // this last so compilers can replace it slightly more efficiently
};
inline bool partial_cas_weak(std::atomic<Data> &d, unsigned expected_idx, Data zz, std::memory_order order = std::memory_order_seq_cst)
{
Data expected = d.load(std::memory_order_relaxed);
expected.m_index = expected_idx; // new index, same everything else
return d.compare_exchange_weak(expected, zz, order);
// updated value of "expected" discarded on CAS failure
// If you make this a retry loop, use it instead of repeated d.load
}
这在 x86-64 (Godbolt) 的实践中编译得很好,内联到传递编译时间常量的调用者中 order
(否则 clang 会在该 order
arg 上疯狂分支以获得函数的独立非内联版本)
# clang10.0 -O3 for x86-64
test_pcw(std::atomic<Data>&, unsigned int, Data):
mov rax, qword ptr [rdi] # load the whole thing
shl rsi, 32
mov eax, eax # zero-extend the low 32 bits, clearing m_index
or rax, rsi # OR in a new high half = expected_idx
lock cmpxchg qword ptr [rdi], rdx # the actual 8-byte CAS
sete al # boolean FLAG result into register
ret
不幸的是,编译器太笨了,无法只加载他们实际需要的原子结构部分,而是加载整个部分,然后将他们不需要的部分归零。 (请参阅
不幸的是,GCC 使 stores/reloads 临时堆栈的 asm 变得混乱,导致存储转发停顿。 GCC 还会将 char m_data
之后的填充置零(无论它是第一个还是最后一个成员),如果内存中的实际对象具有非零填充,则可能导致 CAS 总是失败。如果纯存储和初始化总是使它为零,那可能是不可能的。
像 ARM 或 PowerPC 这样的 LL/SC machine 可以在汇编中轻松地做到这一点(compare/branch 是手动完成的,在加载链接和存储条件之间),但是没有库公开那个便携。 (最重要的是因为它不能为像 x86 这样的机器编译,并且因为你在 LL/SC 事务中可以做的事情受到严重限制,本地变量的调试模式 spill/reload 可能导致代码总是失败.)