带原子的无锁堆栈

Lockfree Stack with atomic

我需要构建一个无锁堆栈实现。我阅读了 this page 并理解了所列无锁推送操作的功能。

现在,我要建立一个类似pop操作的版本。这是我到目前为止所做的,但我认为存在一些并发问题:

template <class T>
bool CASStack<T>::pop(T& ret) {
node<T>* old_head = head.load(std::memory_order_relaxed);

if(old_head == nullptr) {
    return false;
}

// from here on we can assume that there is an element to pop
node<T>* new_head;

do {
    new_head = old_head->next;
} while(!head.compare_exchange_weak(old_head, new_head, std::memory_order_acquire, std::memory_order_relaxed));

ret = old_head->data;

return true;
}

我想如果我在交换后删除old_head也会有麻烦,对吗?

编辑:更新了问题!

你的node<T>* new_head = old_head->next;是一条红鲱鱼;你从不使用这个变量。

在我建议您需要将它放在 do{}while(!CAS) 循环中的评论中,我认为您在做 head.CAS(old_head, new_head)。如果 CAS 确实必须重试,这将出现我正在谈论的问题,将可能过时的指针放入列表中。

但你实际上是在做 head.CAS(old_head, old_head->next),它每次通过循环都会从更新的 old_head 中生成“所需”值。这实际上是正确的,但很难遵循,所以我建议使用 do{}while() 像这样:

// FIXME: this may suffer from ABA problems; see other answers.
node<T>* pop(std::atomic<node<T>*> &head)
{
    // We technically need acquire (or consume) loads of head because we dereference it.
    node<T>* old_head = head.load(std::memory_order_acquire);

    node<T>* new_head;
    do {
        if(old_head == nullptr) {
           // need to re-check because every retry reloads old_head
           // pop in another thread might have emptied the list
            return nullptr;
        }

        new_head = old_head->next;
        // if head still equals old_head this implies the same relation for new_head
    } while(!head.compare_exchange_weak(old_head, new_head,
                                        std::memory_order_acquire));
    // Note the ordering change: acquire for both success and failure

    return old_head;  // defer deletion until some later time
}

(为了处理可能的 ABA 问题,可能需要一个指针+序列号结构。以一种仍然允许有效加载指针的方式来做这件事可能会很麻烦:。请参阅关于这个问题的其他答案也解决了 ABA 问题;我刚才写了这个答案,但不保证整个事情加起来就是一个可用的无锁堆栈!)

Is it allowed to do old_head->next within compare_exchange_weak? Is this still atomic?

CAS 仍然是原子的。编译的任何 compare_exchange_weak 本身都是原子的。不过,编译器会在函数调用 之前评估 args ,因此读取 old_head->next 不是 CAS 执行的原子事务的一部分。它已经被单独读取为临时文件。 (像在 do{}while 循环中那样使用单独的变量显式执行此操作很常见。)

如果 node::nextnodeatomic<> 成员,您应该考虑要用于该加载的内存顺序。但是对于一个纯栈来说,它不一定是原子的,因为链表节点在栈上时永远不会被修改,只有 before 被右边的 next 指针。共享只读访问不是一场比赛。


用作纯堆栈也减少了删除问题:线程无法“窥视”头节点或遍历列表。他们只能在 弹出节点后 查看它,并且 pop 算法确保他们拥有该节点的独占所有权(并负责删除它)。

但是pop()本身需要从head节点加载。如果另一个线程与我们竞争并且 return 将 head 的内存分配给 OS,我们可能会出错。所以我们 有一个删除问题 like RCU does,就像我在评论中提到的那样。

简单地为其他东西重用内存在 大多数 C++ 实现上不是问题,但是:我们会读取 old_head->next 的垃圾值,但是 CAS会失败(因为 head 指针必须在旧的 head 对象被释放之前改变)所以我们永远不会对我们加载的虚假值做任何事情。但是我们的原子加载与非原子存储竞争仍然是 C++ UB。但是编译器必须证明这个竞争实际上 确实 在它被允许发出除正常 asm 之外的任何东西之前发生,并且所有主流 CPU 在 asm 中对这样的竞争没有任何问题.

但是除非你能保证 free()delete 只是将内存放在空闲列表中,即它们不会 munmap 它在负载 headold_head->next 的取消引用,上述推理并不能使调用者立即删除 pop 的 return 值是安全的。这仅意味着问题不太可能发生(并且很难通过简单测试检测到)。


内存排序

我们加载 head 然后期望该指针指向有用的值。 (即 old_head->next)。这正是 memory_order_consume 给我们的。但它很难使用,也很难优化,以至于编译器只是将它加强为 acquire,这使得无法测试使用 consume 的代码。 所以我们真的需要 acquire 来加载所有 head

(在当前编译器中使用 consume 等同于 acquire。如果您 实际上 需要无障碍的数据依赖排序性能,请参阅 C++11: the difference between memory_order_relaxed and memory_order_consume 如何尝试安全地使用 relaxed。)

请注意,从我们弹出的节点中获取值也取决于内存顺序,但我 认为 如果我们不需要 old_head->next 我们可以使用 relaxed除了在 CAS 的 success 一侧(我们至少需要 consume,所以在实践中 acquire)之外的任何地方。

(在主流 C++ 实现上,我们可能可以在所有架构上使用 relaxed,除了 DEC Alpha AXP,这是 90 年代著名的弱序 RISC。编译器几乎肯定会创建数据依赖于加载的指针,因为它没有任何其他方法来访问它需要的值。除了 Alpha 之外的所有“普通”硬件都免费提供 mo_consume 样式依赖排序。因此使用 relaxed 进行测试会永远不会出现问题,除非你有一种罕见的 Alpha 模型,它实际上 可以 在硬件中产生这种重新排序,并且有一个有效的 C++11 实现。但它仍然是“错误的”,并且可能会因编译时重新排序而中断,或者我可能遗漏了一些东西并且 relaxed 实际上可能会在实践中中断而不会内联到更复杂的东西 + 常量传播。)

请注意,这些 mo_acquire 加载 synchronize-with mo_release 存储在推动当前 head[= 指向的对象的线程中115=]。这可以防止来自 old_head 的非原子负载与非原子存储竞争到推送它的线程中的节点。

到目前为止,这是我的解决方案:

template <class T>
bool CASStack<T>::pop(T& ret) {
    node<T>* new_head;

    // get the current head
    node<T>* old_head = head.load(std::memory_order_relaxed);

    do {
        // it is a null pointer iff our stack is empty
        if(old_head == nullptr) {
            return false;
        }

        // otherwise, we can dereference it and access its next node
        new_head = old_head->next;
    } while(!head.compare_exchange_weak(old_head, new_head, std::memory_order_acquire, std::memory_order_relaxed));

    // finally write the popped value into ret
    ret = old_head->data;
    return true;
}

非常感谢您的评价。 我知道这段代码有两个问题:

1) 如果另一个线程在 head.loadnullptr 比较之间压入一个元素,我的算法不会弹出它。我不知道如何解决这个问题。

2) 在 push 操作中,元素是用 new 创建的。如果我在 return true; 之前添加 delete old_head;,我的代码就会崩溃。所以我知道这个算法有内存泄漏。我可以应用 解决方案吗?

想象一下,在加载 old_head 和取消引用 old_head->next 之间,cpu 被中断转移并且很长时间没有回到这个序列(天、周等)。 同时,其他一些线程从您的堆栈中弹出“old_head”,对其进行处理,然后将其返回到堆中,并可能将其重新用于另一个对象。

它对“推送”起作用的原因是“推送代码”拥有要推送的对象。 “pop”并非如此——pop 正在发现对象,然后试图获得它的所有权。要使用“无锁”,您必须能够同时执行这两个操作;这使得链表变得困难,如果不是不可用的话。

相比之下,对于数组,您知道“next”是“top - 1”,所以:

do {
   x = stack[temp = top];
} while (cswap(&top, temp, temp-1) != temp);

非常接近。问题是您需要将世代计数编码到 top 中,以便对“top”的每个分配都是唯一的:

struct uuidx { int index; very_large_int sequence; };
extern (volatile, atomic, whatever) struct uuidx top;

...
struct uuidx temp, next;
do {
    x = stack[(temp = top).index];
    next = (struct uuidx){.index = temp.index - 1,
                 .sequence = temp.sequence+1};
} while (cswap(&top, temp, next) != temp)

@PeterCordes 的回答很有启发性,但并没有解决所有问题。

我正在写我自己的答案,因为我也必须实现无锁堆栈,但在 pop 操作的重入测试中失败了。

Cordes先生给出的实现没有依赖底层ABA problem

了解重入问题:当您尝试弹出堆栈头部时,CAS (compare_and_exchange) 操作仅在堆栈头部 "is the same".

时继续

是"the same"是这里的关键:只要CAS指令走,相同就意味着指针相同,但数据不一定如此——如果同时,堆栈会受到第二个线程的正确弹出? ...然后,另一个线程(第三个线程)推回一个新元素,它恰好存储在与线程 1 中头部相同的地址?

在这种情况下,线程 #1 中的 CAS 指令会成功,但考虑到 ->next 指针不再有效。

避免此 ABA 问题的正确方法似乎是存储一个由头指针和下一个指针组成的原子头结构。

建议的解决方案在这里实现 -- MTL's UnorderedArrayBasedReentrantStack

重入测试在这里 -- https://github.com/zertyz/MTL/blob/master/tests/cpp/UnorderedArrayBasedReentrantStackSpikes.cpp

在 x86_64 和 ARM 32 和 64 位上测试。

希望这对某人有所帮助。

如果您希望多个线程同时执行 pushpop 操作,则无法实现无锁堆栈。

在那种情况下,修改堆栈指针和访问堆栈上的读取或写入数据应该以原子方式完成。如果不是你有两种情况,这两种情况都不一致,push 内存操作顺序:

  • 堆栈指针增量发生在将数据写入堆栈之前:在这种情况下,可以在中间安排并发 pop 操作并从堆栈中读取垃圾数据。
  • 堆栈指针递增发生在向堆栈写入数据之后:在这种情况下并发push可能会使我们在指针递增之前刚刚写入的数据无效

类似地,如果并发 pop 操作是可能的,则读取数据和修改堆栈指针可以与产生无效状态的各种操作交错:

  • 如果pop在指针递减后读取数据,并发push可能会在读取数据之前覆盖数据
  • 如果pop在指针递减之前读取数据,并发pop后跟[=​​11=]可能会导致旧数据被读取两次,而新数据丢失

如果只有一个线程在推送,并且只有一个线程在弹出数据,则(部分)无锁实现是可能的:

  • push 可以读取具有 acquire 语义的堆栈指针,并安全地覆盖堆栈指针,因为没有其他线程正在使用该内存区域,然后自动更新堆栈指针以发出信号pop 指针下方数据有效的线程(使用 release semantic/memory 排序)
  • pop可以读取具有acquire语义的堆栈指针值,读取其上的数据,然后比较交换具有release语义的值以通知另一个线程弹出值使用的内存再次可用于新值。在比较交换失败时,意味着更多的值被压入,它可以在新的栈顶读取新的值。

在代码中:

void push(T value) {
    auto stp = stack_pointer.load(memory_order_acquire);
    stack[++stp] = value;
    stack_pointer.store(stp, memory_order_release);
}

T pop() {
    auto stp = stack_pointer.load(memory_order_acquire);
    while(true) {
        auto value = stack[stp];
        if (stack_pointer.atomic_compare_exchange_weak(stp,stp-1,memory_order_release,memory_order_acquire)) {
            return value;
        }
    }
}

此实现是“部分无锁”的,因为如果并发 push 发生,pop 实现必须自旋,这是一种锁。