Java 的 AtomicMarkableReference<T> 的 C++ 11 原子库等价物是什么

What is the C++ 11 atomic library equivalent of Java's AtomicMarkableReference<T>

需要这个调用来实现一个无锁链表。 AtomicMarkableReference 是来自 java.util.concurrent.atomic 包的对象,它封装了对类型 T 对象的引用和布尔标记。这些字段可以一起或单独更新原子。

谢谢。

假设对象的alignement大于1,你可以使用指针的最后一位作为布尔标记:

template<class T>
class MarkableReference
{
private:
    uintptr_t val;
    static const uintptr_t mask = 1;
public:
    MarkableReference(T* ref = NULL, bool mark = false)
    {
        val = ((uintptr_t)ref & ~mask) | (mark ? 1 : 0);
    }
    T* getRef(void)
    {
        return (T*)(val & ~mask);
    }
    bool getMark(void)
    {
        return (val & mask);
    }
};

要执行原子操作,您需要从这个 class 创建原子变量。例如,如果引用指向的对象类型应为 int,您可以创建此变量:

atomic<MarkableReference<int>> mRef;

对于变量mRef,您可以应用任何操作,该操作适用于普通原子。例如,比较并设置 (CAS):

int a;
int b;
...
// Atomically change pair (&a, true) to (&b, false).
MarkableReference<int> old = mRef.load();
if(old == MarkableReference(&a, true))
{
    if(mRef.compare_exchange_strong(old, MarkableReference(&b, false)))
    {
        // Successful CAS
    }
}
// 'old' contains other value. (Unsuccessfull CAS)

Tsyvarev 的想法(在指针内部使用一个位)很有趣,对于某些用例,可能比单独存储布尔值更有效。对于其他用例,最好将它们分开存储并使用双倍大小的比较和交换同时交换两者。这使得以原子方式修改布尔值或引用更有效。将它们存储在一起意味着您总是必须执行原子读-修改-写来更改一个而不是另一个(在 x86 上,如果您还需要旧值,则可以是 lock or [mem], reglock cmpxchg 循环),而不是只是一个不影响另一个的原子存储。 (或者如果您想要旧值,则为原子 xchg)。

要使用一个结构的两个独立成员来实现,请参阅 关于在 atomic<two_member_struct> 上使用 compare_exchange_weak。我建议将布尔值存储在指针大小的整数中,以避免需要忽略对象中的任何填充,或者当填充不匹配但数据匹配时可能导致 cmpxchg 失败。

如果您经常使用新的指针和布尔值一起更新 MarkableReference,将数据嵌入到指针中可能有利于提高性能。否则它可能很糟糕,如果你能让编译器为单独的成员方式制作高效的代码。

此外,如果您经常需要以原子方式获取指针和标志,嵌入式数据方式非常适合。


Tsyvarev 的实现需要更改以在不设置新指针的情况下自动修改布尔值。 class MarkableReference 本身应该有一个原子成员变量,所以它可以使用 fetch_or 之类的东西。

这是未经测试的,但它编译成看起来正确的代码 (on the Godbolt compiler explorer)。

在 x86-64 上,即使对于未对齐的指针,您也可以通过使用指针的高位而不是低位来让它工作。 ,即 64 位地址实际上是符号扩展的 48 位地址。不过,未来的 x86-64 CPU 可以在未来扩展它,允许完整的 64 位虚拟地址 space 而不是当前的 48 位。然后,您必须 运行 程序在兼容模式下使用此代码,其中 OS 永远不会根据旧规则为它们提供 "non-canonical" 地址。

#include <atomic>
#include <assert.h>

template<class T>
class MarkableReference
{
private:
    std::atomic<uintptr_t> val;
    static const uintptr_t mask = 1;
    uintptr_t combine(T* ref, bool mark) {
        return reinterpret_cast<uintptr_t>(ref) | mark;
    }

public:
    MarkableReference(T* ref, bool mark)
        : val(combine(ref, mark))
    {
        // note that construction of an atomic is not *guaranteed* to be atomic, in case that matters.
        // On most real CPUs, storing a single aligned pointer-sized integer is atomic
        // This does mean that it's not a seq_cst operation, so it doesn't synchronize with anything
        // (and there's no MFENCE required)
        assert((reinterpret_cast<uintptr_t>(ref) & mask) == 0 && "only works with pointers that have the low bit cleared");
    }

    MarkableReference(MarkableReference &other, std::memory_order order = std::memory_order_seq_cst)
        : val(other.val.load(order))
    {}
    // IDK if relaxed is the best choice for this, or if it should exist at all
    MarkableReference &operator=(MarkableReference &other)
    {
        val.store(other.val.load(std::memory_order_relaxed), std::memory_order_relaxed);
        return *this;
    }

/////// Getters  

    T* getRef(std::memory_order order = std::memory_order_seq_cst)
    {
        return reinterpret_cast<T*>(val.load(order) & ~mask);
    }
    bool getMark(std::memory_order order = std::memory_order_seq_cst)
    {
        return (val.load(order) & mask);
    }
    T* getBoth(bool& mark, std::memory_order order = std::memory_order_seq_cst)
    {
        uintptr_t current = val.load(order);
        mark = expected & mask;
        return reinterpret_cast<T*>(expected & ~mask);
    }

/////// Setters (and exchange)

    // memory_order_acq_rel would be a good choice here
    T* xchgRef(T* ref, std::memory_order order = std::memory_order_seq_cst)
    {
        uintptr_t old = val.load(std::memory_order_relaxed);
        bool success;
        do {
            uintptr_t newval = reinterpret_cast<uintptr_t>(ref) | (old&mask);
            success = val.compare_exchange_weak(old, newval, order);
            // updates old on failure
        } while( !success );

        return reinterpret_cast<T*>(old & ~mask);
    }

    bool cmpxchgBoth_weak(T* &expectRef, bool& expectMark, T* desiredRef, bool desiredMark, std::memory_order order = std::memory_order_seq_cst)
    {
        uintptr_t desired = combine(desiredRef, desiredMark);
        uintptr_t expected = combine(expectRef, expectMark);
        bool status = compare_exchange_weak(expected, desired, order);
        expectRef = reinterpret_cast<T*>(expected & ~mask);
        expectMark = expected & mask;
        return status;
    }

    void setRef(T* ref, std::memory_order order = std::memory_order_seq_cst)
    { xchgReg(ref, order); }  // I don't see a way to avoid cmpxchg without a non-atomic read-modify-write of the boolean.
    void setRef_nonatomicBoolean(T* ref, std::memory_order order = std::memory_order_seq_cst)
    {
        uintptr_t old = val.load(std::memory_order_relaxed);  // maybe provide a way to control this order?
        // !!modifications to the boolean by other threads between here and the store will be stepped on!
        uintptr_t newval = combine(ref, old&mask);
        val.store(newval, order);
    }


    void setMark(bool mark, std::memory_order order = std::memory_order_seq_cst)
    {
        if(mark)
            val.fetch_or(mask, order);
        else
            val.fetch_and(~mask, order);
    }

    bool toggleMark(std::memory_order order = std::memory_order_seq_cst)
    {
            return mask & val.fetch_xor(mask, order);
    }

    bool xchgMark(bool mark, std::memory_order order = std::memory_order_seq_cst)
    {
        // setMark might still compile to efficient code if it just called this and let the compile optimize away the fetch part
        uintptr_t old;
        if(mark)
            old = val.fetch_or(mask, order);
        else
            old = val.fetch_and(~mask, order);
        return (old & mask);
        // It might be ideal to compile this to x86 BTS or BTR instructions (when the old value is needed)
        // but clang uses a cmpxchg loop.
    }
};

使用示例,asm 输出显示编译效率高。(参见上面的 godbolt link)

int a;
int b;
MarkableReference<int> mr_a(&a, true);
MarkableReference<int> mr_b(&b, false);


bool getbool(MarkableReference<int> &mr) {
  return mr.getMark();  // using the default memory_order_seq_cst
}
    mov     rax, qword ptr [rdi]
    and     eax, 1
    ret

void storeToRef(MarkableReference<int> &mr, int val) {
  //(*mr.getRef(memory_order_relaxed)) = val;  // less code on weakly-ordered CPUs like MIPS
  (*mr.getRef()) = val;
}
    mov     rax, qword ptr [rdi]
    and     rax, -2
    mov     dword ptr [rax], esi
    ret

void foo() {
  mr_a.setMark(true, memory_order_relaxed);
}
    lock
    or      qword ptr [rip + mr_a], 1
    ret


void bar() {
  mr_b = mr_a;
}
    // no MFENCE since I decided to make operator= use memory_order_relaxed.  acquire / release would also not need MFENCE on x86
    mov     rax, qword ptr [rip + mr_a]
    mov     qword ptr [rip + mr_b], rax
    ret

// this one compiles to a cmpxchg loop and a branch :/
// but I think that's unavoidable
void baz() {
  bool tmp = mr_b.xchgMark(false);
  mr_a.setMark(tmp);
}

    mov     rax, qword ptr [rip + mr_b]

.LBB4_1:                                # =>This Inner Loop Header: Depth=1
    mov     rcx, rax
    and     rcx, -2
    lock cmpxchg qword ptr [rip + mr_b], rcx
    jne     .LBB4_1

    test    al, 1
    jne     .LBB4_3

    lock and     qword ptr [rip + mr_a], -2
    ret

.LBB4_3:
    lock or      qword ptr [rip + mr_a], 1
    ret