c++11 multi-reader / multi-writer queue 对对象状态和永久递增索引使用原子

c++11 multi-reader / multi-writer queue using atomics for object state and perpetual incremented indexes

我正在使用原子和循环缓冲区来实现多reader 线程、多编写器线程对象池。

很难调查,因为检测代码会导致错误消失!

型号

生产者(或编写线程)向 Ring 请求 Element 以便 'prepare' 元素。终止时,编写器线程会更改元素状态,因此 reader 可以 'consume' 它。之后,该元素再次可用于写入。

消费者(或reader线程)向Ring请求一个对象,以便'read'该对象。 在 'releasing' 对象之后,对象处于 state::Ready 状态,例如可由 reader 线程使用。 如果没有对象可用,它可能会失败,例如 Ring 中的下一个空闲对象不处于 state::Unused 状态。

2个类、ElementRing

Element :

总结:

Ring

当 reader 成功 LockForRead 一个对象时,_read 索引增加。 当编写器成功 LockForWrite 一个对象时,_write 索引会递增。

main :

我们将一些作者和 readers 线程添加到一个向量中,共享相同的 Ring。每个线程只是尝试 get_read 或 get_write 元素并在之后释放它们。

基于 Element 过渡,一切都应该没问题,但可以观察到 Ring 在某些时候被阻塞,因为环中的某些元素处于 state::Ready 状态 _write % _elems.size() 索引指向它并且对称地,环中的一些元素处于状态 state::Unused 并且 _read % _elems.size() 索引指向它! 两者 = 死锁

#include<atomic>
#include<vector>
#include<thread>
#include<iostream>
#include<cstdint>

typedef enum : int
{
    Unused, LockForWrite, Ready,  LockForRead
}state;

class Element
{
    std::atomic<state> _state;
public:
    Element():_state(Unused){ }

    // a reader need to successfully make the transition Ready => LockForRead
    bool lock_for_read() { state s = Ready; return _state.compare_exchange_strong(s, LockForRead); }
    void unlock_read() { state s = Unused; _state.store(s); }

    // a reader need to successfully make the transition Unused => LockForWrite
    bool lock_for_write() { state s = Unused; return _state.compare_exchange_strong(s, LockForWrite); }
    void unlock_write() { state s = Ready;  _state.store(s); }
};

class Ring
{
    std::vector<Element> _elems;
    std::atomic<int64_t> _read, _write;

public:
    Ring(size_t capacity)
        : _elems(capacity), _read(0), _write(0) {}

    Element * get_for_read() {
        Element * ret = &_elems[ _read.load() % _elems.size() ];
        if (!ret->lock_for_read()) // if success, the object belongs to the caller thread as reader
            return NULL;
        _read.fetch_add(1); // success! incr _read index 
        return ret;
    }
    Element * get_for_write() {
        Element * ret = &_elems[ _write.load() % _elems.size() ];
        if (!ret->lock_for_write())// if success, the object belongs to the caller thread as writer
            return NULL;
        _write.fetch_add(1); // success! incr _write index
        return ret;
    }
    void release_read(Element* e) { e->unlock_read();}
    void release_write(Element* e) { e->unlock_write();}
};

int main()
{

    const int capacity = 10; // easy to process modulo[![enter image description here][1]][1]

    std::atomic<bool> stop=false;

    Ring ring(capacity);

    std::function<void()> writer_job = [&]()
    {
        std::cout << "writer starting" << std::endl;
        Element * e;
        while (!stop)
        {
            if (!(e = ring.get_for_write())) 
                continue;
            // do some real writer job ...
            ring.release_write(e);
        }
    };
    std::function<void()> reader_job = [&]()
    {
        std::cout << "reader starting" << std::endl;
        Element * e;
        while (!stop)
        {
            if (!(e = ring.get_for_read())) 
                continue;
            // do some real reader job ...
            ring.release_read(e);
        }
    };

    int nb_writers = 1;
    int nb_readers = 2;

    std::vector<std::thread> threads;
    threads.reserve(nb_writers + nb_readers);

    std::cout << "adding writers" << std::endl;
    while (nb_writers--)
        threads.push_back(std::thread(writer_job));

    std::cout << "adding readers" << std::endl; 
    while (nb_readers--)
        threads.push_back(std::thread(reader_job));

    // wait user key press, halt in debugger after 1 or 2 seconds
    // in order to reproduce problem and watch ring
    std::cin.get();

    stop = true;

    std::cout << "waiting all threads...\n";
    for (auto & th : threads)
        th.join();

    std::cout << "end" << std::endl;
}

此"watch debugger screeshot" 已在运行 1 秒后暂停程序。如您所见,_read 指向标记为 state::Unused 的元素 8,因此没有任何转换可以为此 reader 解锁此状态,除了 writer 但 _write 索引指向在状态为 state::Ready 的元素 0 上!

我的问题:我错过了什么?从结构上讲,我确定顺序是正确的,但我缺少一些原子技巧 ...

os 测试:rhel5/gcc 4.1.2,rhel 7/gcc 4.8,win10/ms visual 2015,win10/mingw

您没有围绕两个共享计数器 _read 和 _write 的增量的原子部分。 这对我来说很糟糕,你可以无意义地切换另一个元素。

想象一下这个场景, 1 reader R1和1写手W合作愉快

Reader 2 执行:Element * ret = &_elems[ _read.load() % _elems.size() ]; 并被推下 cpu.

现在R1和W还在一起玩,所以_read和_write的位置现在是任意的w.r.t。 R2 指向的元素 ret。

现在在某个时候 R2 被安排了,碰巧 *ret_ 是可读的(同样可能,R1 和 W 绕过该块几次)。

哎呀,如你所见,我们将读取它,并递增“_read”,但_read 与_ret 无关。这会产生一些漏洞,即尚未读取但低于 _read 索引的元素。

因此,创建临界区以确保 _read/_write 的增量与实际锁在相同的语义步骤中完成。

关于这个问题是正确的:如果 read/write 锁之间存在延迟,您的线程可以通过乱序读取和写入元素来创建序列中的 "holes"和指数的增量。修复是为了验证索引在初始读取和增量之间没有变化,a la:

class Element
{
    std::atomic<state> _state;
public:
    Element():_state(Unused){ }

    // a reader need to successfully make the transition Ready => LockForRead
    bool lock_for_read() {
        state s = Ready;
        return _state.compare_exchange_strong(s, LockForRead);
    }
    void abort_read() { _state = Ready; }
    void unlock_read() { state s = Unused; _state.store(s); }

    // a reader need to successfully make the transition Unused => LockForWrite
    bool lock_for_write() {
        state s = Unused;
        return _state.compare_exchange_strong(s, LockForWrite);
    }
    void abort_write() { _state = Unused; }
    void unlock_write() { state s = Ready;  _state.store(s); }
};

class Ring
{
    std::vector<Element> _elems;
    std::atomic<int64_t> _read, _write;

public:
    Ring(size_t capacity)
        : _elems(capacity), _read(0), _write(0) {}

    Element * get_for_read() {
        auto i = _read.load();
        Element * ret = &_elems[ i % _elems.size() ];
        if (ret->lock_for_read()) {
            // if success, the object belongs to the caller thread as reader
            if (_read.compare_exchange_strong(i, i + 1))
                return ret;
            // Woops, reading out of order.
            ret->abort_read();
        }
        return NULL;
    }
    Element * get_for_write() {
        auto i = _write.load();
        Element * ret = &_elems[ i % _elems.size() ];
        if (ret->lock_for_write()) {
            // if success, the object belongs to the caller thread as writer
            if (_write.compare_exchange_strong(i, i + 1))
                return ret;
            // Woops, writing out of order.
            ret->abort_write();
        }
        return NULL;
    }
    void release_read(Element* e) { e->unlock_read();}
    void release_write(Element* e) { e->unlock_write();}
};