C++ 无锁队列因多线程而崩溃

C++ lockless queue crashes with multiple threads

我试图更好地理解在为多线程编码时控制内存顺序。过去我经常使用互斥体来序列化变量访问,但我尽量避免使用互斥体以提高性能。

我有一个指针队列,可能会被许多线程填充并被许多线程消耗。它在单线程下工作正常,但当我 运行 多线程时崩溃。看起来消费者可能会得到指针的副本,这会导致它们被释放两次。这有点难说,因为当我输入任何打印语句时,它 运行 没有崩溃就很好。

首先,我使用预先分配的向量来保存指针。我保留了 3 个原子索引变量来跟踪向量中的哪些元素需要处理。可能值得注意的是,我尝试使用 _queue 类型,其中元素本身是原子的,但似乎没有帮助。这是更简单的版本:

std::atomic<uint32_t> iread;
std::atomic<uint32_t> iwrite;
std::atomic<uint32_t> iend;
std::vector<JEvent*> _queue;

// Write to _queue (may be thread 1,2,3,...)
while(!_done){
    uint32_t idx = iwrite.load();
    uint32_t inext = (idx+1)%_queue.size();
    if( inext == iread.load() ) return kQUEUE_FULL;
    if( iwrite.compare_exchange_weak(idx, inext) ){
        _queue[idx] = jevent; // jevent is JEvent* passed into this method
        while( !_done ){
            if( iend.compare_exchange_weak(idx, inext) ) break;
        }
        break;
    }
}

来自同一个 class

// Read from _queue (may be thread 1,2,3,...)
while(!_done){
    uint32_t idx = iread.load();
    if(idx == iend.load()) return NULL;
    JEvent *Event = _queue[idx];
    uint32_t inext = (idx+1)%_queue.size();
    if( iread.compare_exchange_weak(idx, inext) ){
        _nevents_processed++;
        return Event;
    }
}

我应该强调,我真的很想知道为什么这不起作用。实施一些其他预制包可以让我解决这个问题,但无法帮助我避免以后再次犯同样类型的错误。

更新 我将 Alexandr Konovalov 的回答标记为正确(请参阅下面他的回答中的评论)。如果有人看到此页面,"Write" 部分的更正代码为:

std::atomic<uint32_t> iread;
std::atomic<uint32_t> iwrite;
std::atomic<uint32_t> iend;
std::vector<JEvent*> _queue;

// Write to _queue (may be thread 1,2,3,...)
while(!_done){
    uint32_t idx = iwrite.load();
    uint32_t inext = (idx+1)%_queue.size();
    if( inext == iread.load() ) return kQUEUE_FULL;
    if( iwrite.compare_exchange_weak(idx, inext) ){
        _queue[idx] = jevent; // jevent is JEvent* passed into this method
        uint32_t save_idx = idx;
        while( !_done ){
            if( iend.compare_exchange_weak(idx, inext) ) break;
            idx = save_idx;
        }
        break;
    }
}

对我来说,当有 2 个作者和 1 个 reader 时,可能会出现一个问题。假设第一个作者在

之前停止
 _queue[0] = jevent;

并且第二个写入器通过 iend 发出信号,表明它的 _queue[1] 已准备好被读取。然后,reader 通过 iend 看到 _queue[0] 已准备好被读取,所以我们有数据竞争。

我建议您尝试 Relacy Race Detector,它非常适合此类分析。