无锁列表擦除
Lock-free slist erase
我一直在尝试实现无锁 slist 擦除操作,但我显然遇到了问题。不幸的是我真的很需要它。
为了解决常见的 ABA cmpxchg 相关问题,我编写了 tagged_ptr<> "smart pointer" class,其中嵌入了一个指向存储在 std::atomic<> 中的指针的计数器。每次通过列表中的 CAS 更新指针时,标记值都会递增:
head.compare_exchange_weak(old, old(newptr))
使用来自 old
的递增标签存储 newptr
。
这允许多作者事务,但它没有解决同时更新两个指针的问题。 (例如,使用 tagged_ptr<> 很容易实现堆栈)
参见代码 here。
第 256 行是 erase() 函数:
bool erase(list_node * node) {
std::atomic<tagged_ptr<list_node>>* before;
tagged_ptr<list_node> itr, after;
for(;;) {
// Find previous (or head) before-node-ptr
before = &head;
itr = before->load(std::memory_order_acquire);
while(itr) {
if(itr.get() == node) {
break;
} else if(itr.is_void()) {
// Thread interfered iteration.
before = &head;
itr = before->load(std::memory_order_acquire);
} else {
// Access next ptr
before = &itr->next;
itr = before->load(std::memory_order_acquire);
}
}
after = node->next.load(std::memory_order_acquire);
if(after.is_void() || !itr) {
return false;
}
// Point before-ptr to after. (set head or previous node's next ptr)
if(before->compare_exchange_strong(itr, itr(after))) {
// Set node->next to invalid ptr.
// list iterators will see it and restart their operation.
while(!node->next.compare_exchange_weak(after, after().set_void()))
;
return true;
}
// If *before changed while trying to update it to after, retry search.
}
}
在测试代码中,两个线程同时将节点推入列表,两个线程通过数据搜索随机节点并尝试删除它们。
我遇到的错误是:
- 列表以某种方式变成循环的(列表以 null 终止),因此线程永远卡住,迭代列表永远找不到列表的末尾。
我对您的 tagged_ptr
实施有些疑问。
另外,我对这部分代码有一些疑问:
} else if(itr.is_void()) {
// Thread interfered iteration.
before = &head;
itr = before->load(std::memory_order_acquire);
假设一个线程删除了最后一个节点(您在列表中有 1 个节点并且两个线程都调用了擦除)。剩下的线程会查询头指针,它是空的。这部分代码将进入无限循环,因为它处于 while(itr)
循环中。
这部分也不是原子的:
// Point before-ptr to after. (set head or previous node's next ptr)
if(before->compare_exchange_strong(itr, itr(after))) {
// Set node->next to invalid ptr.
// list iterators will see it and restart their operation.
while(!node->next.compare_exchange_weak(after, after().set_void()))
;
return true;
}
如果 before
被第一个 CAS 修改,你的 node
是一个独立的指针,仍然指向列表。另一个线程可以将其 before
设置为此 node
并修改它和 returns。
老实说,如果您的列表是循环的,那么调试起来并不难,只需在调试器下中断并按照列表进行操作即可。您会看到它何时循环,并且您可以弄清楚它是如何做到的。您也可以为此使用 valgrind。
tagged_ptr
class 很难掌握,使用 "set_void()" 方法将内部 ptr
设置为 0xFF..F
但布尔测试在 while如果 "void",(itr) 将 return 为真。我想这个名字应该是 invalid 而不是 void 如果它是(不是真的),它应该在 bool 运算符中 return false .如果 itr 变为 "void" (据我所知,在上面的代码中是可能的) while(itr)
将无限期循环。
例如,假设您有:
Head:A -> B -> C
然后在删除一些线程后你得到
Thread 2 removing C : Head:A, before = &B on first iteration, exiting the while(itr) loop since itr == C (scheduled here)
Thread 1 removing B : Head:A->C and B->C (scheduled just before line 286 of your example)
Thread 2 resume, and will modify B to B->null (line 283)
and then C->null to C->yourVoid (line 286, then it's scheduled)
Then, thread 1 update B->next to B->yourVoid (useless here for understanding the issue)
You now have A->C->yourVoid
无论何时在这里迭代,你都会有一个无限循环,因为当 itr 搜索到达 C 时,下一步是 "void" 并且从 head 重新开始迭代并不能解决这里的任何问题,它'会给出相同的结果,列表已损坏。
我一直在尝试实现无锁 slist 擦除操作,但我显然遇到了问题。不幸的是我真的很需要它。
为了解决常见的 ABA cmpxchg 相关问题,我编写了 tagged_ptr<> "smart pointer" class,其中嵌入了一个指向存储在 std::atomic<> 中的指针的计数器。每次通过列表中的 CAS 更新指针时,标记值都会递增:
head.compare_exchange_weak(old, old(newptr))
使用来自 old
的递增标签存储 newptr
。
这允许多作者事务,但它没有解决同时更新两个指针的问题。 (例如,使用 tagged_ptr<> 很容易实现堆栈)
参见代码 here。 第 256 行是 erase() 函数:
bool erase(list_node * node) {
std::atomic<tagged_ptr<list_node>>* before;
tagged_ptr<list_node> itr, after;
for(;;) {
// Find previous (or head) before-node-ptr
before = &head;
itr = before->load(std::memory_order_acquire);
while(itr) {
if(itr.get() == node) {
break;
} else if(itr.is_void()) {
// Thread interfered iteration.
before = &head;
itr = before->load(std::memory_order_acquire);
} else {
// Access next ptr
before = &itr->next;
itr = before->load(std::memory_order_acquire);
}
}
after = node->next.load(std::memory_order_acquire);
if(after.is_void() || !itr) {
return false;
}
// Point before-ptr to after. (set head or previous node's next ptr)
if(before->compare_exchange_strong(itr, itr(after))) {
// Set node->next to invalid ptr.
// list iterators will see it and restart their operation.
while(!node->next.compare_exchange_weak(after, after().set_void()))
;
return true;
}
// If *before changed while trying to update it to after, retry search.
}
}
在测试代码中,两个线程同时将节点推入列表,两个线程通过数据搜索随机节点并尝试删除它们。 我遇到的错误是:
- 列表以某种方式变成循环的(列表以 null 终止),因此线程永远卡住,迭代列表永远找不到列表的末尾。
我对您的 tagged_ptr
实施有些疑问。
另外,我对这部分代码有一些疑问:
} else if(itr.is_void()) {
// Thread interfered iteration.
before = &head;
itr = before->load(std::memory_order_acquire);
假设一个线程删除了最后一个节点(您在列表中有 1 个节点并且两个线程都调用了擦除)。剩下的线程会查询头指针,它是空的。这部分代码将进入无限循环,因为它处于 while(itr)
循环中。
这部分也不是原子的:
// Point before-ptr to after. (set head or previous node's next ptr)
if(before->compare_exchange_strong(itr, itr(after))) {
// Set node->next to invalid ptr.
// list iterators will see it and restart their operation.
while(!node->next.compare_exchange_weak(after, after().set_void()))
;
return true;
}
如果 before
被第一个 CAS 修改,你的 node
是一个独立的指针,仍然指向列表。另一个线程可以将其 before
设置为此 node
并修改它和 returns。
老实说,如果您的列表是循环的,那么调试起来并不难,只需在调试器下中断并按照列表进行操作即可。您会看到它何时循环,并且您可以弄清楚它是如何做到的。您也可以为此使用 valgrind。
tagged_ptr
class 很难掌握,使用 "set_void()" 方法将内部 ptr
设置为 0xFF..F
但布尔测试在 while如果 "void",(itr) 将 return 为真。我想这个名字应该是 invalid 而不是 void 如果它是(不是真的),它应该在 bool 运算符中 return false .如果 itr 变为 "void" (据我所知,在上面的代码中是可能的) while(itr)
将无限期循环。
例如,假设您有:
Head:A -> B -> C
然后在删除一些线程后你得到
Thread 2 removing C : Head:A, before = &B on first iteration, exiting the while(itr) loop since itr == C (scheduled here)
Thread 1 removing B : Head:A->C and B->C (scheduled just before line 286 of your example)
Thread 2 resume, and will modify B to B->null (line 283)
and then C->null to C->yourVoid (line 286, then it's scheduled)
Then, thread 1 update B->next to B->yourVoid (useless here for understanding the issue)
You now have A->C->yourVoid
无论何时在这里迭代,你都会有一个无限循环,因为当 itr 搜索到达 C 时,下一步是 "void" 并且从 head 重新开始迭代并不能解决这里的任何问题,它'会给出相同的结果,列表已损坏。