C++ 无锁队列因多线程而崩溃
C++ lockless queue crashes with multiple threads
我试图更好地理解在为多线程编码时控制内存顺序。过去我经常使用互斥体来序列化变量访问,但我尽量避免使用互斥体以提高性能。
我有一个指针队列,可能会被许多线程填充并被许多线程消耗。它在单线程下工作正常,但当我 运行 多线程时崩溃。看起来消费者可能会得到指针的副本,这会导致它们被释放两次。这有点难说,因为当我输入任何打印语句时,它 运行 没有崩溃就很好。
首先,我使用预先分配的向量来保存指针。我保留了 3 个原子索引变量来跟踪向量中的哪些元素需要处理。可能值得注意的是,我尝试使用 _queue 类型,其中元素本身是原子的,但似乎没有帮助。这是更简单的版本:
std::atomic<uint32_t> iread;
std::atomic<uint32_t> iwrite;
std::atomic<uint32_t> iend;
std::vector<JEvent*> _queue;
// Write to _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iwrite.load();
uint32_t inext = (idx+1)%_queue.size();
if( inext == iread.load() ) return kQUEUE_FULL;
if( iwrite.compare_exchange_weak(idx, inext) ){
_queue[idx] = jevent; // jevent is JEvent* passed into this method
while( !_done ){
if( iend.compare_exchange_weak(idx, inext) ) break;
}
break;
}
}
来自同一个 class
// Read from _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iread.load();
if(idx == iend.load()) return NULL;
JEvent *Event = _queue[idx];
uint32_t inext = (idx+1)%_queue.size();
if( iread.compare_exchange_weak(idx, inext) ){
_nevents_processed++;
return Event;
}
}
我应该强调,我真的很想知道为什么这不起作用。实施一些其他预制包可以让我解决这个问题,但无法帮助我避免以后再次犯同样类型的错误。
更新
我将 Alexandr Konovalov 的回答标记为正确(请参阅下面他的回答中的评论)。如果有人看到此页面,"Write" 部分的更正代码为:
std::atomic<uint32_t> iread;
std::atomic<uint32_t> iwrite;
std::atomic<uint32_t> iend;
std::vector<JEvent*> _queue;
// Write to _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iwrite.load();
uint32_t inext = (idx+1)%_queue.size();
if( inext == iread.load() ) return kQUEUE_FULL;
if( iwrite.compare_exchange_weak(idx, inext) ){
_queue[idx] = jevent; // jevent is JEvent* passed into this method
uint32_t save_idx = idx;
while( !_done ){
if( iend.compare_exchange_weak(idx, inext) ) break;
idx = save_idx;
}
break;
}
}
对我来说,当有 2 个作者和 1 个 reader 时,可能会出现一个问题。假设第一个作者在
之前停止
_queue[0] = jevent;
并且第二个写入器通过 iend 发出信号,表明它的 _queue[1] 已准备好被读取。然后,reader 通过 iend 看到 _queue[0] 已准备好被读取,所以我们有数据竞争。
我建议您尝试 Relacy Race Detector,它非常适合此类分析。
我试图更好地理解在为多线程编码时控制内存顺序。过去我经常使用互斥体来序列化变量访问,但我尽量避免使用互斥体以提高性能。
我有一个指针队列,可能会被许多线程填充并被许多线程消耗。它在单线程下工作正常,但当我 运行 多线程时崩溃。看起来消费者可能会得到指针的副本,这会导致它们被释放两次。这有点难说,因为当我输入任何打印语句时,它 运行 没有崩溃就很好。
首先,我使用预先分配的向量来保存指针。我保留了 3 个原子索引变量来跟踪向量中的哪些元素需要处理。可能值得注意的是,我尝试使用 _queue 类型,其中元素本身是原子的,但似乎没有帮助。这是更简单的版本:
std::atomic<uint32_t> iread;
std::atomic<uint32_t> iwrite;
std::atomic<uint32_t> iend;
std::vector<JEvent*> _queue;
// Write to _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iwrite.load();
uint32_t inext = (idx+1)%_queue.size();
if( inext == iread.load() ) return kQUEUE_FULL;
if( iwrite.compare_exchange_weak(idx, inext) ){
_queue[idx] = jevent; // jevent is JEvent* passed into this method
while( !_done ){
if( iend.compare_exchange_weak(idx, inext) ) break;
}
break;
}
}
来自同一个 class
// Read from _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iread.load();
if(idx == iend.load()) return NULL;
JEvent *Event = _queue[idx];
uint32_t inext = (idx+1)%_queue.size();
if( iread.compare_exchange_weak(idx, inext) ){
_nevents_processed++;
return Event;
}
}
我应该强调,我真的很想知道为什么这不起作用。实施一些其他预制包可以让我解决这个问题,但无法帮助我避免以后再次犯同样类型的错误。
更新 我将 Alexandr Konovalov 的回答标记为正确(请参阅下面他的回答中的评论)。如果有人看到此页面,"Write" 部分的更正代码为:
std::atomic<uint32_t> iread;
std::atomic<uint32_t> iwrite;
std::atomic<uint32_t> iend;
std::vector<JEvent*> _queue;
// Write to _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iwrite.load();
uint32_t inext = (idx+1)%_queue.size();
if( inext == iread.load() ) return kQUEUE_FULL;
if( iwrite.compare_exchange_weak(idx, inext) ){
_queue[idx] = jevent; // jevent is JEvent* passed into this method
uint32_t save_idx = idx;
while( !_done ){
if( iend.compare_exchange_weak(idx, inext) ) break;
idx = save_idx;
}
break;
}
}
对我来说,当有 2 个作者和 1 个 reader 时,可能会出现一个问题。假设第一个作者在
之前停止 _queue[0] = jevent;
并且第二个写入器通过 iend 发出信号,表明它的 _queue[1] 已准备好被读取。然后,reader 通过 iend 看到 _queue[0] 已准备好被读取,所以我们有数据竞争。
我建议您尝试 Relacy Race Detector,它非常适合此类分析。