c++11 multi-reader / multi-writer queue 对对象状态和永久递增索引使用原子
c++11 multi-reader / multi-writer queue using atomics for object state and perpetual incremented indexes
我正在使用原子和循环缓冲区来实现多reader 线程、多编写器线程对象池。
很难调查,因为检测代码会导致错误消失!
型号
生产者(或编写线程)向 Ring
请求 Element
以便 'prepare' 元素。终止时,编写器线程会更改元素状态,因此 reader 可以 'consume' 它。之后,该元素再次可用于写入。
消费者(或reader线程)向Ring请求一个对象,以便'read'该对象。
在 'releasing' 对象之后,对象处于 state::Ready
状态,例如可由 reader 线程使用。
如果没有对象可用,它可能会失败,例如 Ring 中的下一个空闲对象不处于 state::Unused
状态。
2个类、Element
和Ring
Element
:
- 要写入,写入线程必须成功地将
_state
成员从 state::Unused
交换到 state::LockForWrite
- 完成后,编写器线程强制状态为
state::Ready
(它应该是唯一处理此元素的)
- 要读取,阅读器线程必须成功地将
_state
成员从 state::Ready
交换到 state::LockForRead
- 完成后,reader 线程强制状态为
state::Unused
(它应该是唯一处理此元素的)
总结:
- 作家生命周期:
state::Unused
-> state::LockForWrite
-> state::Ready
- readers 生命周期:
state::Ready
-> state::LockForRead
-> state::Unused
Ring
- 有一个向量
Element
,被视为一个循环缓冲区。
std::atomic<int64_t> _read, _write;
是用于通过以下方式访问元素的 2 个索引:
_elems[ _write % _elems.size() ]
对于作家,
_elems[ _read % _elems.size() ]
reader 秒。
当 reader 成功 LockForRead
一个对象时,_read
索引增加。
当编写器成功 LockForWrite
一个对象时,_write
索引会递增。
main
:
我们将一些作者和 readers 线程添加到一个向量中,共享相同的 Ring
。每个线程只是尝试 get_read 或 get_write 元素并在之后释放它们。
基于 Element
过渡,一切都应该没问题,但可以观察到 Ring 在某些时候被阻塞,因为环中的某些元素处于 state::Ready
状态 _write % _elems.size()
索引指向它并且对称地,环中的一些元素处于状态 state::Unused
并且 _read % _elems.size()
索引指向它! 两者 = 死锁。
#include<atomic>
#include<vector>
#include<thread>
#include<iostream>
#include<cstdint>
typedef enum : int
{
Unused, LockForWrite, Ready, LockForRead
}state;
class Element
{
std::atomic<state> _state;
public:
Element():_state(Unused){ }
// a reader need to successfully make the transition Ready => LockForRead
bool lock_for_read() { state s = Ready; return _state.compare_exchange_strong(s, LockForRead); }
void unlock_read() { state s = Unused; _state.store(s); }
// a reader need to successfully make the transition Unused => LockForWrite
bool lock_for_write() { state s = Unused; return _state.compare_exchange_strong(s, LockForWrite); }
void unlock_write() { state s = Ready; _state.store(s); }
};
class Ring
{
std::vector<Element> _elems;
std::atomic<int64_t> _read, _write;
public:
Ring(size_t capacity)
: _elems(capacity), _read(0), _write(0) {}
Element * get_for_read() {
Element * ret = &_elems[ _read.load() % _elems.size() ];
if (!ret->lock_for_read()) // if success, the object belongs to the caller thread as reader
return NULL;
_read.fetch_add(1); // success! incr _read index
return ret;
}
Element * get_for_write() {
Element * ret = &_elems[ _write.load() % _elems.size() ];
if (!ret->lock_for_write())// if success, the object belongs to the caller thread as writer
return NULL;
_write.fetch_add(1); // success! incr _write index
return ret;
}
void release_read(Element* e) { e->unlock_read();}
void release_write(Element* e) { e->unlock_write();}
};
int main()
{
const int capacity = 10; // easy to process modulo[![enter image description here][1]][1]
std::atomic<bool> stop=false;
Ring ring(capacity);
std::function<void()> writer_job = [&]()
{
std::cout << "writer starting" << std::endl;
Element * e;
while (!stop)
{
if (!(e = ring.get_for_write()))
continue;
// do some real writer job ...
ring.release_write(e);
}
};
std::function<void()> reader_job = [&]()
{
std::cout << "reader starting" << std::endl;
Element * e;
while (!stop)
{
if (!(e = ring.get_for_read()))
continue;
// do some real reader job ...
ring.release_read(e);
}
};
int nb_writers = 1;
int nb_readers = 2;
std::vector<std::thread> threads;
threads.reserve(nb_writers + nb_readers);
std::cout << "adding writers" << std::endl;
while (nb_writers--)
threads.push_back(std::thread(writer_job));
std::cout << "adding readers" << std::endl;
while (nb_readers--)
threads.push_back(std::thread(reader_job));
// wait user key press, halt in debugger after 1 or 2 seconds
// in order to reproduce problem and watch ring
std::cin.get();
stop = true;
std::cout << "waiting all threads...\n";
for (auto & th : threads)
th.join();
std::cout << "end" << std::endl;
}
此"watch debugger screeshot" 已在运行 1 秒后暂停程序。如您所见,_read
指向标记为 state::Unused
的元素 8,因此没有任何转换可以为此 reader 解锁此状态,除了 writer 但 _write
索引指向在状态为 state::Ready
的元素 0 上!
我的问题:我错过了什么?从结构上讲,我确定顺序是正确的,但我缺少一些原子技巧 ...
os 测试:rhel5/gcc 4.1.2,rhel 7/gcc 4.8,win10/ms visual 2015,win10/mingw
您没有围绕两个共享计数器 _read 和 _write 的增量的原子部分。
这对我来说很糟糕,你可以无意义地切换另一个元素。
想象一下这个场景,
1 reader R1和1写手W合作愉快
Reader 2 执行:Element * ret = &_elems[ _read.load() % _elems.size() ];
并被推下 cpu.
现在R1和W还在一起玩,所以_read和_write的位置现在是任意的w.r.t。 R2 指向的元素 ret。
现在在某个时候 R2 被安排了,碰巧 *ret_ 是可读的(同样可能,R1 和 W 绕过该块几次)。
哎呀,如你所见,我们将读取它,并递增“_read”,但_read 与_ret 无关。这会产生一些漏洞,即尚未读取但低于 _read 索引的元素。
因此,创建临界区以确保 _read/_write 的增量与实际锁在相同的语义步骤中完成。
关于这个问题是正确的:如果 read/write 锁之间存在延迟,您的线程可以通过乱序读取和写入元素来创建序列中的 "holes"和指数的增量。修复是为了验证索引在初始读取和增量之间没有变化,a la:
class Element
{
std::atomic<state> _state;
public:
Element():_state(Unused){ }
// a reader need to successfully make the transition Ready => LockForRead
bool lock_for_read() {
state s = Ready;
return _state.compare_exchange_strong(s, LockForRead);
}
void abort_read() { _state = Ready; }
void unlock_read() { state s = Unused; _state.store(s); }
// a reader need to successfully make the transition Unused => LockForWrite
bool lock_for_write() {
state s = Unused;
return _state.compare_exchange_strong(s, LockForWrite);
}
void abort_write() { _state = Unused; }
void unlock_write() { state s = Ready; _state.store(s); }
};
class Ring
{
std::vector<Element> _elems;
std::atomic<int64_t> _read, _write;
public:
Ring(size_t capacity)
: _elems(capacity), _read(0), _write(0) {}
Element * get_for_read() {
auto i = _read.load();
Element * ret = &_elems[ i % _elems.size() ];
if (ret->lock_for_read()) {
// if success, the object belongs to the caller thread as reader
if (_read.compare_exchange_strong(i, i + 1))
return ret;
// Woops, reading out of order.
ret->abort_read();
}
return NULL;
}
Element * get_for_write() {
auto i = _write.load();
Element * ret = &_elems[ i % _elems.size() ];
if (ret->lock_for_write()) {
// if success, the object belongs to the caller thread as writer
if (_write.compare_exchange_strong(i, i + 1))
return ret;
// Woops, writing out of order.
ret->abort_write();
}
return NULL;
}
void release_read(Element* e) { e->unlock_read();}
void release_write(Element* e) { e->unlock_write();}
};
我正在使用原子和循环缓冲区来实现多reader 线程、多编写器线程对象池。
很难调查,因为检测代码会导致错误消失!
型号
生产者(或编写线程)向 Ring
请求 Element
以便 'prepare' 元素。终止时,编写器线程会更改元素状态,因此 reader 可以 'consume' 它。之后,该元素再次可用于写入。
消费者(或reader线程)向Ring请求一个对象,以便'read'该对象。
在 'releasing' 对象之后,对象处于 state::Ready
状态,例如可由 reader 线程使用。
如果没有对象可用,它可能会失败,例如 Ring 中的下一个空闲对象不处于 state::Unused
状态。
2个类、Element
和Ring
Element
:
- 要写入,写入线程必须成功地将
_state
成员从state::Unused
交换到state::LockForWrite
- 完成后,编写器线程强制状态为
state::Ready
(它应该是唯一处理此元素的) - 要读取,阅读器线程必须成功地将
_state
成员从state::Ready
交换到state::LockForRead
- 完成后,reader 线程强制状态为
state::Unused
(它应该是唯一处理此元素的)
总结:
- 作家生命周期:
state::Unused
->state::LockForWrite
->state::Ready
- readers 生命周期:
state::Ready
->state::LockForRead
->state::Unused
Ring
- 有一个向量
Element
,被视为一个循环缓冲区。 std::atomic<int64_t> _read, _write;
是用于通过以下方式访问元素的 2 个索引:_elems[ _write % _elems.size() ]
对于作家,_elems[ _read % _elems.size() ]
reader 秒。
当 reader 成功 LockForRead
一个对象时,_read
索引增加。
当编写器成功 LockForWrite
一个对象时,_write
索引会递增。
main
:
我们将一些作者和 readers 线程添加到一个向量中,共享相同的 Ring
。每个线程只是尝试 get_read 或 get_write 元素并在之后释放它们。
基于 Element
过渡,一切都应该没问题,但可以观察到 Ring 在某些时候被阻塞,因为环中的某些元素处于 state::Ready
状态 _write % _elems.size()
索引指向它并且对称地,环中的一些元素处于状态 state::Unused
并且 _read % _elems.size()
索引指向它! 两者 = 死锁。
#include<atomic>
#include<vector>
#include<thread>
#include<iostream>
#include<cstdint>
typedef enum : int
{
Unused, LockForWrite, Ready, LockForRead
}state;
class Element
{
std::atomic<state> _state;
public:
Element():_state(Unused){ }
// a reader need to successfully make the transition Ready => LockForRead
bool lock_for_read() { state s = Ready; return _state.compare_exchange_strong(s, LockForRead); }
void unlock_read() { state s = Unused; _state.store(s); }
// a reader need to successfully make the transition Unused => LockForWrite
bool lock_for_write() { state s = Unused; return _state.compare_exchange_strong(s, LockForWrite); }
void unlock_write() { state s = Ready; _state.store(s); }
};
class Ring
{
std::vector<Element> _elems;
std::atomic<int64_t> _read, _write;
public:
Ring(size_t capacity)
: _elems(capacity), _read(0), _write(0) {}
Element * get_for_read() {
Element * ret = &_elems[ _read.load() % _elems.size() ];
if (!ret->lock_for_read()) // if success, the object belongs to the caller thread as reader
return NULL;
_read.fetch_add(1); // success! incr _read index
return ret;
}
Element * get_for_write() {
Element * ret = &_elems[ _write.load() % _elems.size() ];
if (!ret->lock_for_write())// if success, the object belongs to the caller thread as writer
return NULL;
_write.fetch_add(1); // success! incr _write index
return ret;
}
void release_read(Element* e) { e->unlock_read();}
void release_write(Element* e) { e->unlock_write();}
};
int main()
{
const int capacity = 10; // easy to process modulo[![enter image description here][1]][1]
std::atomic<bool> stop=false;
Ring ring(capacity);
std::function<void()> writer_job = [&]()
{
std::cout << "writer starting" << std::endl;
Element * e;
while (!stop)
{
if (!(e = ring.get_for_write()))
continue;
// do some real writer job ...
ring.release_write(e);
}
};
std::function<void()> reader_job = [&]()
{
std::cout << "reader starting" << std::endl;
Element * e;
while (!stop)
{
if (!(e = ring.get_for_read()))
continue;
// do some real reader job ...
ring.release_read(e);
}
};
int nb_writers = 1;
int nb_readers = 2;
std::vector<std::thread> threads;
threads.reserve(nb_writers + nb_readers);
std::cout << "adding writers" << std::endl;
while (nb_writers--)
threads.push_back(std::thread(writer_job));
std::cout << "adding readers" << std::endl;
while (nb_readers--)
threads.push_back(std::thread(reader_job));
// wait user key press, halt in debugger after 1 or 2 seconds
// in order to reproduce problem and watch ring
std::cin.get();
stop = true;
std::cout << "waiting all threads...\n";
for (auto & th : threads)
th.join();
std::cout << "end" << std::endl;
}
此"watch debugger screeshot" 已在运行 1 秒后暂停程序。如您所见,_read
指向标记为 state::Unused
的元素 8,因此没有任何转换可以为此 reader 解锁此状态,除了 writer 但 _write
索引指向在状态为 state::Ready
的元素 0 上!
我的问题:我错过了什么?从结构上讲,我确定顺序是正确的,但我缺少一些原子技巧 ...
os 测试:rhel5/gcc 4.1.2,rhel 7/gcc 4.8,win10/ms visual 2015,win10/mingw
您没有围绕两个共享计数器 _read 和 _write 的增量的原子部分。 这对我来说很糟糕,你可以无意义地切换另一个元素。
想象一下这个场景, 1 reader R1和1写手W合作愉快
Reader 2 执行:Element * ret = &_elems[ _read.load() % _elems.size() ]; 并被推下 cpu.
现在R1和W还在一起玩,所以_read和_write的位置现在是任意的w.r.t。 R2 指向的元素 ret。
现在在某个时候 R2 被安排了,碰巧 *ret_ 是可读的(同样可能,R1 和 W 绕过该块几次)。
哎呀,如你所见,我们将读取它,并递增“_read”,但_read 与_ret 无关。这会产生一些漏洞,即尚未读取但低于 _read 索引的元素。
因此,创建临界区以确保 _read/_write 的增量与实际锁在相同的语义步骤中完成。
class Element
{
std::atomic<state> _state;
public:
Element():_state(Unused){ }
// a reader need to successfully make the transition Ready => LockForRead
bool lock_for_read() {
state s = Ready;
return _state.compare_exchange_strong(s, LockForRead);
}
void abort_read() { _state = Ready; }
void unlock_read() { state s = Unused; _state.store(s); }
// a reader need to successfully make the transition Unused => LockForWrite
bool lock_for_write() {
state s = Unused;
return _state.compare_exchange_strong(s, LockForWrite);
}
void abort_write() { _state = Unused; }
void unlock_write() { state s = Ready; _state.store(s); }
};
class Ring
{
std::vector<Element> _elems;
std::atomic<int64_t> _read, _write;
public:
Ring(size_t capacity)
: _elems(capacity), _read(0), _write(0) {}
Element * get_for_read() {
auto i = _read.load();
Element * ret = &_elems[ i % _elems.size() ];
if (ret->lock_for_read()) {
// if success, the object belongs to the caller thread as reader
if (_read.compare_exchange_strong(i, i + 1))
return ret;
// Woops, reading out of order.
ret->abort_read();
}
return NULL;
}
Element * get_for_write() {
auto i = _write.load();
Element * ret = &_elems[ i % _elems.size() ];
if (ret->lock_for_write()) {
// if success, the object belongs to the caller thread as writer
if (_write.compare_exchange_strong(i, i + 1))
return ret;
// Woops, writing out of order.
ret->abort_write();
}
return NULL;
}
void release_read(Element* e) { e->unlock_read();}
void release_write(Element* e) { e->unlock_write();}
};