Java 的 AtomicMarkableReference<T> 的 C++ 11 原子库等价物是什么
What is the C++ 11 atomic library equivalent of Java's AtomicMarkableReference<T>
需要这个调用来实现一个无锁链表。
AtomicMarkableReference 是来自 java.util.concurrent.atomic 包的对象,它封装了对类型 T 对象的引用和布尔标记。这些字段可以一起或单独更新原子。
谢谢。
假设对象的alignement大于1,你可以使用指针的最后一位作为布尔标记:
template<class T>
class MarkableReference
{
private:
uintptr_t val;
static const uintptr_t mask = 1;
public:
MarkableReference(T* ref = NULL, bool mark = false)
{
val = ((uintptr_t)ref & ~mask) | (mark ? 1 : 0);
}
T* getRef(void)
{
return (T*)(val & ~mask);
}
bool getMark(void)
{
return (val & mask);
}
};
要执行原子操作,您需要从这个 class 创建原子变量。例如,如果引用指向的对象类型应为 int
,您可以创建此变量:
atomic<MarkableReference<int>> mRef;
对于变量mRef
,您可以应用任何操作,该操作适用于普通原子。例如,比较并设置 (CAS):
int a;
int b;
...
// Atomically change pair (&a, true) to (&b, false).
MarkableReference<int> old = mRef.load();
if(old == MarkableReference(&a, true))
{
if(mRef.compare_exchange_strong(old, MarkableReference(&b, false)))
{
// Successful CAS
}
}
// 'old' contains other value. (Unsuccessfull CAS)
Tsyvarev 的想法(在指针内部使用一个位)很有趣,对于某些用例,可能比单独存储布尔值更有效。对于其他用例,最好将它们分开存储并使用双倍大小的比较和交换同时交换两者。这使得以原子方式修改布尔值或引用更有效。将它们存储在一起意味着您总是必须执行原子读-修改-写来更改一个而不是另一个(在 x86 上,如果您还需要旧值,则可以是 lock or [mem], reg
或 lock cmpxchg
循环),而不是只是一个不影响另一个的原子存储。 (或者如果您想要旧值,则为原子 xchg
)。
要使用一个结构的两个独立成员来实现,请参阅 关于在 atomic<two_member_struct>
上使用 compare_exchange_weak
。我建议将布尔值存储在指针大小的整数中,以避免需要忽略对象中的任何填充,或者当填充不匹配但数据匹配时可能导致 cmpxchg 失败。
如果您经常使用新的指针和布尔值一起更新 MarkableReference
,将数据嵌入到指针中可能有利于提高性能。否则它可能很糟糕,如果你能让编译器为单独的成员方式制作高效的代码。
此外,如果您经常需要以原子方式获取指针和标志,嵌入式数据方式非常适合。
Tsyvarev 的实现需要更改以在不设置新指针的情况下自动修改布尔值。 class MarkableReference
本身应该有一个原子成员变量,所以它可以使用 fetch_or 之类的东西。
这是未经测试的,但它编译成看起来正确的代码 (on the Godbolt compiler explorer)。
在 x86-64 上,即使对于未对齐的指针,您也可以通过使用指针的高位而不是低位来让它工作。 ,即 64 位地址实际上是符号扩展的 48 位地址。不过,未来的 x86-64 CPU 可以在未来扩展它,允许完整的 64 位虚拟地址 space 而不是当前的 48 位。然后,您必须 运行 程序在兼容模式下使用此代码,其中 OS 永远不会根据旧规则为它们提供 "non-canonical" 地址。
#include <atomic>
#include <assert.h>
template<class T>
class MarkableReference
{
private:
std::atomic<uintptr_t> val;
static const uintptr_t mask = 1;
uintptr_t combine(T* ref, bool mark) {
return reinterpret_cast<uintptr_t>(ref) | mark;
}
public:
MarkableReference(T* ref, bool mark)
: val(combine(ref, mark))
{
// note that construction of an atomic is not *guaranteed* to be atomic, in case that matters.
// On most real CPUs, storing a single aligned pointer-sized integer is atomic
// This does mean that it's not a seq_cst operation, so it doesn't synchronize with anything
// (and there's no MFENCE required)
assert((reinterpret_cast<uintptr_t>(ref) & mask) == 0 && "only works with pointers that have the low bit cleared");
}
MarkableReference(MarkableReference &other, std::memory_order order = std::memory_order_seq_cst)
: val(other.val.load(order))
{}
// IDK if relaxed is the best choice for this, or if it should exist at all
MarkableReference &operator=(MarkableReference &other)
{
val.store(other.val.load(std::memory_order_relaxed), std::memory_order_relaxed);
return *this;
}
/////// Getters
T* getRef(std::memory_order order = std::memory_order_seq_cst)
{
return reinterpret_cast<T*>(val.load(order) & ~mask);
}
bool getMark(std::memory_order order = std::memory_order_seq_cst)
{
return (val.load(order) & mask);
}
T* getBoth(bool& mark, std::memory_order order = std::memory_order_seq_cst)
{
uintptr_t current = val.load(order);
mark = expected & mask;
return reinterpret_cast<T*>(expected & ~mask);
}
/////// Setters (and exchange)
// memory_order_acq_rel would be a good choice here
T* xchgRef(T* ref, std::memory_order order = std::memory_order_seq_cst)
{
uintptr_t old = val.load(std::memory_order_relaxed);
bool success;
do {
uintptr_t newval = reinterpret_cast<uintptr_t>(ref) | (old&mask);
success = val.compare_exchange_weak(old, newval, order);
// updates old on failure
} while( !success );
return reinterpret_cast<T*>(old & ~mask);
}
bool cmpxchgBoth_weak(T* &expectRef, bool& expectMark, T* desiredRef, bool desiredMark, std::memory_order order = std::memory_order_seq_cst)
{
uintptr_t desired = combine(desiredRef, desiredMark);
uintptr_t expected = combine(expectRef, expectMark);
bool status = compare_exchange_weak(expected, desired, order);
expectRef = reinterpret_cast<T*>(expected & ~mask);
expectMark = expected & mask;
return status;
}
void setRef(T* ref, std::memory_order order = std::memory_order_seq_cst)
{ xchgReg(ref, order); } // I don't see a way to avoid cmpxchg without a non-atomic read-modify-write of the boolean.
void setRef_nonatomicBoolean(T* ref, std::memory_order order = std::memory_order_seq_cst)
{
uintptr_t old = val.load(std::memory_order_relaxed); // maybe provide a way to control this order?
// !!modifications to the boolean by other threads between here and the store will be stepped on!
uintptr_t newval = combine(ref, old&mask);
val.store(newval, order);
}
void setMark(bool mark, std::memory_order order = std::memory_order_seq_cst)
{
if(mark)
val.fetch_or(mask, order);
else
val.fetch_and(~mask, order);
}
bool toggleMark(std::memory_order order = std::memory_order_seq_cst)
{
return mask & val.fetch_xor(mask, order);
}
bool xchgMark(bool mark, std::memory_order order = std::memory_order_seq_cst)
{
// setMark might still compile to efficient code if it just called this and let the compile optimize away the fetch part
uintptr_t old;
if(mark)
old = val.fetch_or(mask, order);
else
old = val.fetch_and(~mask, order);
return (old & mask);
// It might be ideal to compile this to x86 BTS or BTR instructions (when the old value is needed)
// but clang uses a cmpxchg loop.
}
};
使用示例,asm 输出显示编译效率高。(参见上面的 godbolt link)
int a;
int b;
MarkableReference<int> mr_a(&a, true);
MarkableReference<int> mr_b(&b, false);
bool getbool(MarkableReference<int> &mr) {
return mr.getMark(); // using the default memory_order_seq_cst
}
mov rax, qword ptr [rdi]
and eax, 1
ret
void storeToRef(MarkableReference<int> &mr, int val) {
//(*mr.getRef(memory_order_relaxed)) = val; // less code on weakly-ordered CPUs like MIPS
(*mr.getRef()) = val;
}
mov rax, qword ptr [rdi]
and rax, -2
mov dword ptr [rax], esi
ret
void foo() {
mr_a.setMark(true, memory_order_relaxed);
}
lock
or qword ptr [rip + mr_a], 1
ret
void bar() {
mr_b = mr_a;
}
// no MFENCE since I decided to make operator= use memory_order_relaxed. acquire / release would also not need MFENCE on x86
mov rax, qword ptr [rip + mr_a]
mov qword ptr [rip + mr_b], rax
ret
// this one compiles to a cmpxchg loop and a branch :/
// but I think that's unavoidable
void baz() {
bool tmp = mr_b.xchgMark(false);
mr_a.setMark(tmp);
}
mov rax, qword ptr [rip + mr_b]
.LBB4_1: # =>This Inner Loop Header: Depth=1
mov rcx, rax
and rcx, -2
lock cmpxchg qword ptr [rip + mr_b], rcx
jne .LBB4_1
test al, 1
jne .LBB4_3
lock and qword ptr [rip + mr_a], -2
ret
.LBB4_3:
lock or qword ptr [rip + mr_a], 1
ret
需要这个调用来实现一个无锁链表。 AtomicMarkableReference 是来自 java.util.concurrent.atomic 包的对象,它封装了对类型 T 对象的引用和布尔标记。这些字段可以一起或单独更新原子。
谢谢。
假设对象的alignement大于1,你可以使用指针的最后一位作为布尔标记:
template<class T>
class MarkableReference
{
private:
uintptr_t val;
static const uintptr_t mask = 1;
public:
MarkableReference(T* ref = NULL, bool mark = false)
{
val = ((uintptr_t)ref & ~mask) | (mark ? 1 : 0);
}
T* getRef(void)
{
return (T*)(val & ~mask);
}
bool getMark(void)
{
return (val & mask);
}
};
要执行原子操作,您需要从这个 class 创建原子变量。例如,如果引用指向的对象类型应为 int
,您可以创建此变量:
atomic<MarkableReference<int>> mRef;
对于变量mRef
,您可以应用任何操作,该操作适用于普通原子。例如,比较并设置 (CAS):
int a;
int b;
...
// Atomically change pair (&a, true) to (&b, false).
MarkableReference<int> old = mRef.load();
if(old == MarkableReference(&a, true))
{
if(mRef.compare_exchange_strong(old, MarkableReference(&b, false)))
{
// Successful CAS
}
}
// 'old' contains other value. (Unsuccessfull CAS)
Tsyvarev 的想法(在指针内部使用一个位)很有趣,对于某些用例,可能比单独存储布尔值更有效。对于其他用例,最好将它们分开存储并使用双倍大小的比较和交换同时交换两者。这使得以原子方式修改布尔值或引用更有效。将它们存储在一起意味着您总是必须执行原子读-修改-写来更改一个而不是另一个(在 x86 上,如果您还需要旧值,则可以是 lock or [mem], reg
或 lock cmpxchg
循环),而不是只是一个不影响另一个的原子存储。 (或者如果您想要旧值,则为原子 xchg
)。
要使用一个结构的两个独立成员来实现,请参阅 atomic<two_member_struct>
上使用 compare_exchange_weak
。我建议将布尔值存储在指针大小的整数中,以避免需要忽略对象中的任何填充,或者当填充不匹配但数据匹配时可能导致 cmpxchg 失败。
如果您经常使用新的指针和布尔值一起更新 MarkableReference
,将数据嵌入到指针中可能有利于提高性能。否则它可能很糟糕,如果你能让编译器为单独的成员方式制作高效的代码。
此外,如果您经常需要以原子方式获取指针和标志,嵌入式数据方式非常适合。
Tsyvarev 的实现需要更改以在不设置新指针的情况下自动修改布尔值。 class MarkableReference
本身应该有一个原子成员变量,所以它可以使用 fetch_or 之类的东西。
这是未经测试的,但它编译成看起来正确的代码 (on the Godbolt compiler explorer)。
在 x86-64 上,即使对于未对齐的指针,您也可以通过使用指针的高位而不是低位来让它工作。
#include <atomic>
#include <assert.h>
template<class T>
class MarkableReference
{
private:
std::atomic<uintptr_t> val;
static const uintptr_t mask = 1;
uintptr_t combine(T* ref, bool mark) {
return reinterpret_cast<uintptr_t>(ref) | mark;
}
public:
MarkableReference(T* ref, bool mark)
: val(combine(ref, mark))
{
// note that construction of an atomic is not *guaranteed* to be atomic, in case that matters.
// On most real CPUs, storing a single aligned pointer-sized integer is atomic
// This does mean that it's not a seq_cst operation, so it doesn't synchronize with anything
// (and there's no MFENCE required)
assert((reinterpret_cast<uintptr_t>(ref) & mask) == 0 && "only works with pointers that have the low bit cleared");
}
MarkableReference(MarkableReference &other, std::memory_order order = std::memory_order_seq_cst)
: val(other.val.load(order))
{}
// IDK if relaxed is the best choice for this, or if it should exist at all
MarkableReference &operator=(MarkableReference &other)
{
val.store(other.val.load(std::memory_order_relaxed), std::memory_order_relaxed);
return *this;
}
/////// Getters
T* getRef(std::memory_order order = std::memory_order_seq_cst)
{
return reinterpret_cast<T*>(val.load(order) & ~mask);
}
bool getMark(std::memory_order order = std::memory_order_seq_cst)
{
return (val.load(order) & mask);
}
T* getBoth(bool& mark, std::memory_order order = std::memory_order_seq_cst)
{
uintptr_t current = val.load(order);
mark = expected & mask;
return reinterpret_cast<T*>(expected & ~mask);
}
/////// Setters (and exchange)
// memory_order_acq_rel would be a good choice here
T* xchgRef(T* ref, std::memory_order order = std::memory_order_seq_cst)
{
uintptr_t old = val.load(std::memory_order_relaxed);
bool success;
do {
uintptr_t newval = reinterpret_cast<uintptr_t>(ref) | (old&mask);
success = val.compare_exchange_weak(old, newval, order);
// updates old on failure
} while( !success );
return reinterpret_cast<T*>(old & ~mask);
}
bool cmpxchgBoth_weak(T* &expectRef, bool& expectMark, T* desiredRef, bool desiredMark, std::memory_order order = std::memory_order_seq_cst)
{
uintptr_t desired = combine(desiredRef, desiredMark);
uintptr_t expected = combine(expectRef, expectMark);
bool status = compare_exchange_weak(expected, desired, order);
expectRef = reinterpret_cast<T*>(expected & ~mask);
expectMark = expected & mask;
return status;
}
void setRef(T* ref, std::memory_order order = std::memory_order_seq_cst)
{ xchgReg(ref, order); } // I don't see a way to avoid cmpxchg without a non-atomic read-modify-write of the boolean.
void setRef_nonatomicBoolean(T* ref, std::memory_order order = std::memory_order_seq_cst)
{
uintptr_t old = val.load(std::memory_order_relaxed); // maybe provide a way to control this order?
// !!modifications to the boolean by other threads between here and the store will be stepped on!
uintptr_t newval = combine(ref, old&mask);
val.store(newval, order);
}
void setMark(bool mark, std::memory_order order = std::memory_order_seq_cst)
{
if(mark)
val.fetch_or(mask, order);
else
val.fetch_and(~mask, order);
}
bool toggleMark(std::memory_order order = std::memory_order_seq_cst)
{
return mask & val.fetch_xor(mask, order);
}
bool xchgMark(bool mark, std::memory_order order = std::memory_order_seq_cst)
{
// setMark might still compile to efficient code if it just called this and let the compile optimize away the fetch part
uintptr_t old;
if(mark)
old = val.fetch_or(mask, order);
else
old = val.fetch_and(~mask, order);
return (old & mask);
// It might be ideal to compile this to x86 BTS or BTR instructions (when the old value is needed)
// but clang uses a cmpxchg loop.
}
};
使用示例,asm 输出显示编译效率高。(参见上面的 godbolt link)
int a;
int b;
MarkableReference<int> mr_a(&a, true);
MarkableReference<int> mr_b(&b, false);
bool getbool(MarkableReference<int> &mr) {
return mr.getMark(); // using the default memory_order_seq_cst
}
mov rax, qword ptr [rdi]
and eax, 1
ret
void storeToRef(MarkableReference<int> &mr, int val) {
//(*mr.getRef(memory_order_relaxed)) = val; // less code on weakly-ordered CPUs like MIPS
(*mr.getRef()) = val;
}
mov rax, qword ptr [rdi]
and rax, -2
mov dword ptr [rax], esi
ret
void foo() {
mr_a.setMark(true, memory_order_relaxed);
}
lock
or qword ptr [rip + mr_a], 1
ret
void bar() {
mr_b = mr_a;
}
// no MFENCE since I decided to make operator= use memory_order_relaxed. acquire / release would also not need MFENCE on x86
mov rax, qword ptr [rip + mr_a]
mov qword ptr [rip + mr_b], rax
ret
// this one compiles to a cmpxchg loop and a branch :/
// but I think that's unavoidable
void baz() {
bool tmp = mr_b.xchgMark(false);
mr_a.setMark(tmp);
}
mov rax, qword ptr [rip + mr_b]
.LBB4_1: # =>This Inner Loop Header: Depth=1
mov rcx, rax
and rcx, -2
lock cmpxchg qword ptr [rip + mr_b], rcx
jne .LBB4_1
test al, 1
jne .LBB4_3
lock and qword ptr [rip + mr_a], -2
ret
.LBB4_3:
lock or qword ptr [rip + mr_a], 1
ret