C++11的CAS如何实现Valois的Queue

How to implement Valois' Queue by C++11's CAS

我想通过 C++11 中提供的原子 CAS 实现 JD Valois 的论文实现无锁队列

例如,Valois 的算法定义了一个 Enqueue 函数:

Enqueue(x) {
    q = new Record();
    q->value = x;
    q->next = NULL;

    do {
        p = tail;
    } while( ! CAS(p->next, NULL, q) ); // 1

    CAS(tail, p, q); // 2
}

然后我写了这样的代码

struct Node {
    T * val;
    Node * next;
};
std::atomic<Node *> head, tail;

void enqueue(T * t) {
    Node * nd = new Node();
    nd->val = t;
    nd->next = nullptr;

    std::atomic<Node *> p;
    do {
        p = tail; // 1
    } while ( /* CAS1 */  ! std::atomic_compare_exchange_weak(&(p->next), nullptr, nd) );
    /* CAS2 */ std::atomic_compare_exchange_weak(&tail, p.load(), nd)
}

然后发现CAS1CAS2这两个cas函数用错了。一方面,p->next 不是 std::atomic 的类型,另一方面,atomic_compare_exchange_weak 的第二个参数 expected 需要一个指针。在问题 Why do C++11 CAS operations take two pointer parameters? 中,cas 函数会将 *expected 设置为当前值,这会导致对 nullptr 的取消引用。而且,stmt 1, p = tail 也失败了,因为原子的 operator= 被删除了。那么如何根据 JD Valois 的论文实现无锁队列呢?

您正确描述了所有问题。下一步只是修复代码。

struct Node {
    T * val;
    std::atomic<Node *> next;
};
std::atomic<Node *> head, tail;

void enqueue(T * t) {
    Node * nd = new Node();
    nd->val = t;
    nd->next = nullptr;

    Node *p, *next;
    do {
        p = tail.load(); // 1
        next = nullptr;
    } while (/* CAS1 */ !std::atomic_compare_exchange_weak(&p->next, &next, nd));
    /* CAS2 */ std::atomic_compare_exchange_weak(&tail, &p, nd);
}