无锁列表擦除

Lock-free slist erase

我一直在尝试实现无锁 slist 擦除操作,但我显然遇到了问题。不幸的是我真的很需要它。

为了解决常见的 ABA cmpxchg 相关问题,我编写了 tagged_ptr<> "smart pointer" class,其中嵌入了一个指向存储在 std::atomic<> 中的指针的计数器。每次通过列表中的 CAS 更新指针时,标记值都会递增: head.compare_exchange_weak(old, old(newptr)) 使用来自 old 的递增标签存储 newptr。 这允许多作者事务,但它没有解决同时更新两个指针的问题。 (例如,使用 tagged_ptr<> 很容易实现堆栈)

参见代码 here。 第 256 行是 erase() 函数:

bool erase(list_node * node) {
    std::atomic<tagged_ptr<list_node>>* before;
    tagged_ptr<list_node> itr, after;
    for(;;) {
        // Find previous (or head) before-node-ptr
        before = &head;
        itr = before->load(std::memory_order_acquire);
        while(itr) {
            if(itr.get() == node) {
                break;
            } else if(itr.is_void()) {
                // Thread interfered iteration.
                before = &head;
                itr = before->load(std::memory_order_acquire);
            } else {
                // Access next ptr
                before = &itr->next;
                itr = before->load(std::memory_order_acquire);
            }
        }

        after = node->next.load(std::memory_order_acquire);
        if(after.is_void() || !itr) {
            return false;
        }

        // Point before-ptr to after. (set head or previous node's next ptr)
        if(before->compare_exchange_strong(itr, itr(after))) {
            // Set node->next to invalid ptr.
            // list iterators will see it and restart their operation.
            while(!node->next.compare_exchange_weak(after, after().set_void()))
                ;
            return true;
        }
        // If *before changed while trying to update it to after, retry search.
    }
}

在测试代码中,两个线程同时将节点推入列表,两个线程通过数据搜索随机节点并尝试删除它们。 我遇到的错误是:

我对您的 tagged_ptr 实施有些疑问。 另外,我对这部分代码有一些疑问:

         } else if(itr.is_void()) {
                // Thread interfered iteration.
                before = &head;
                itr = before->load(std::memory_order_acquire);

假设一个线程删除了最后一个节点(您在列表中有 1 个节点并且两个线程都调用了擦除)。剩下的线程会查询头指针,它是空的。这部分代码将进入无限循环,因为它处于 while(itr) 循环中。

这部分也不是原子的:

            // Point before-ptr to after. (set head or previous node's next ptr)
            if(before->compare_exchange_strong(itr, itr(after))) {
                // Set node->next to invalid ptr.
                // list iterators will see it and restart their operation.
                while(!node->next.compare_exchange_weak(after, after().set_void()))
                    ;
                return true;
}

如果 before 被第一个 CAS 修改,你的 node 是一个独立的指针,仍然指向列表。另一个线程可以将其 before 设置为此 node 并修改它和 returns。 老实说,如果您的列表是循环的,那么调试起来并不难,只需在调试器下中断并按照列表进行操作即可。您会看到它何时循环,并且您可以弄清楚它是如何做到的。您也可以为此使用 valgrind。

tagged_ptr class 很难掌握,使用 "set_void()" 方法将内部 ptr 设置为 0xFF..F 但布尔测试在 while如果 "void",(itr) 将 return 为真。我想这个名字应该是 invalid 而不是 void 如果它是(不是真的),它应该在 bool 运算符中 return false .如果 itr 变为 "void" (据我所知,在上面的代码中是可能的) while(itr) 将无限期循环。

例如,假设您有:

Head:A -> B -> C

然后在删除一些线程后你得到

Thread 2 removing C : Head:A, before = &B on first iteration, exiting the while(itr) loop since itr == C (scheduled here)
Thread 1 removing B : Head:A->C and B->C (scheduled just before line 286 of your example)
Thread 2 resume, and will modify B to B->null (line 283)
and then C->null to C->yourVoid (line 286, then it's scheduled)

Then, thread 1 update B->next to B->yourVoid (useless here for understanding the issue)
You now have A->C->yourVoid

无论何时在这里迭代,你都会有一个无限循环,因为当 itr 搜索到达 C 时,下一步是 "void" 并且从 head 重新开始迭代并不能解决这里的任何问题,它'会给出相同的结果,列表已损坏。