在这个无锁 SPSC 环形缓冲区队列中,每个原子的内存顺序是否正确?
Are memory orders for each atomic correct in this lock-free SPSC ring buffer queue?
我有一个环形缓冲区,如下所示:
template<class T>
class RingBuffer {
public:
bool Publish();
bool Consume(T& value);
bool IsEmpty(std::size_t head, std::size_t tail);
bool IsFull(std::size_t head, std::size_t tail);
private:
std::size_t Next(std::size_t slot);
std::vector<T> buffer_;
std::atomic<std::size_t> tail_{0};
std::atomic<std::size_t> head_{0};
static constexpr std::size_t kBufferSize{8};
};
此数据结构用于两个线程:发布者线程和消费者线程。下面列出了两个没有将内存命令传递给原子的主要函数:
bool Publish(T value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(/* memory_order */);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail) /*, memory order */);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(/* memory order */);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head) /*, memory order */);
return true;
}
我知道,至少,我必须在 Publish()
函数中包含 tail_.store()
,在 std::memory_order::release
中包含 tail_.load()
,在 [=] 中包含 std::memory_order::acquire
19=] 函数创建 发生在 写入 buffer_
和读取 buffer_
之间的连接之前。此外,我可以将 std::memory_order::relaxed
传递给 Publish()
中的 tail_.load()
和 Consume()
中的 head_.load()
,因为同一个线程将看到对原子的最后一次写入。现在的功能是这样的:
bool Publish(T value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail), std::memory_order::release);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(std::memory_order::relaxed);
const size_t curr_tail = tail_.load(std::memory_order::acquire);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head) /*, memory order */);
return true;
}
最后一步是将内存顺序放入剩余的对中:head_.load()
在 Publish()
和 head_.store()
在 Consume()
中。我必须在 Consume()
中的 head_.store()
之前执行 value = std::move(buffer_[curr_head]);
行,否则在缓冲区已满的情况下我将进行数据竞争,因此,至少,我必须通过 std::memory_order::release
到该存储操作以避免重新排序。但是我是否必须在 Publish()
函数中将 std::memory_order::acquire
放入 head_.load()
中?我知道这将有助于 head_.load()
在合理的时间内看到 head_.store()
,这与 std::memory_order::relaxed
不同,但如果我不需要这种更短时间的保证来看到商店操作的副作用,我可以放宽记忆顺序吗?如果我不能,那为什么?完成代码:
bool Publish(T value) {
const size_t curr_head = head_.load(std::memory_order::relaxed); // or acquire?
const size_t curr_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail), std::memory_order::release);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(std::memory_order::relaxed);
const size_t curr_tail = tail_.load(std::memory_order::acquire);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head), std::memory_order::release);
return true;
}
每个原子的内存顺序是否正确?我对每个原子变量中每个内存顺序的使用的解释是否正确?
以前的答案可能有助于作为背景:
c++, std::atomic, what is std::memory_order and how to use them?
https://bartoszmilewski.com/2008/12/01/c-atomics-and-memory-ordering/
首先,您描述的系统称为单一生产者 - 单一消费者队列。您可以随时查看此 container 的增强版本进行比较。我经常会检查 boost 代码,即使我在不允许 boost 的情况下工作。这是因为检查和理解一个稳定的解决方案会让您深入了解您可能遇到的问题(他们为什么那样做?哦,我明白了 - 等等)。
鉴于您的设计,并且编写了许多类似的容器,我会说您的设计必须小心区分空的和满的。如果你使用经典的 {begin,end} 对,你会遇到由于包装
{begin, begin+size} == {begin, begin} == empty
好的,回到同步问题。
考虑到顺序只影响重新排序,在Publish 中使用release 似乎是教科书式的flag 用法。在容器的大小增加之前没有任何东西会读取该值,因此您不关心值本身的写入顺序是否以随机顺序发生,您只关心在计数增加之前必须完全写入该值.所以我同意,您在 Publish 函数中正确使用了标志。
我确实质疑在 Consume 中是否需要“释放”,但如果您要移出队列,并且这些移动会产生副作用,则可能需要这样做。我会说,如果您追求原始速度,那么可能值得制作第二个版本,它专门用于 trivial objects,使用宽松的顺序来增加头部。
您也可以像 push/pop 一样考虑就地 new/delete。虽然大多数移动都会使对象处于空状态,但标准只要求它在移动后保持有效状态。移动后显式删除对象可能会避免以后出现模糊的错误。
您可能会争辩说 consume 中的两个原子负载可能是 memory_order_consume。这放宽了限制说“我不在乎它们加载的顺序,只要它们在使用时都已加载”。尽管我怀疑它在实践中是否会产生任何收益。我也对这个建议感到紧张,因为当我查看增强版本时,它非常接近您所拥有的版本。
https://www.boost.org/doc/libs/1_66_0/boost/lockfree/spsc_queue.hpp
template <typename Functor>
bool consume_one(Functor & functor, T * buffer, size_t max_size)
{
const size_t write_index = write_index_.load(memory_order_acquire);
const size_t read_index = read_index_.load(memory_order_relaxed);
if ( empty(write_index, read_index) )
return false;
T & object_to_consume = buffer[read_index];
functor( object_to_consume );
object_to_consume.~T();
size_t next = next_index(read_index, max_size);
read_index_.store(next, memory_order_release);
return true;
}
总的来说,尽管您的方法看起来不错并且与增强版本非常相似。但是......如果我是你,我可能会逐行浏览提升版本,看看它有什么不同。很容易出错。
编辑:
抱歉,我刚刚注意到您的代码中 memory_order_acquire/memory_order_relaxed 标记的顺序错误。
你应该让最后一个写的是“发布”,第一个读的是“获取”。这不会显着影响行为,但会使其更易于阅读。 (开始同步,结束同步)
回复评论:
正如@user17732522所暗示的那样,复制操作也是写入,因此对普通对象的优化不会同步,对普通对象的优化将引入U/B(该死的很容易出错)
为了理解这个问题的正确std::memory_order
,让我们考虑一下是否只有一个线程而不是生产者和消费者线程。
在单线程场景中,bool Publish(T value)
将看到前一个 bool Consume(T& value)
以相同顺序执行的所有操作,类似地,bool Consume(T& value)
将看到前一个 bool Publish(T value)
以相同顺序执行的所有操作。
所以在多线程场景下,发布者和消费者线程必须以类似的方式同步。可以通过memory barriers
.
实现同步
Consume function release
atomic store head_.store(Next(curr_head), std::memory_order::release);
保证在 store
之前发出的操作将在它之前执行并且不会跨越它,并且发布可以通过 acquire
原子加载同步 head_
变量,如果 Publish 可以看到 head_
则可以保证它会看到在它之前执行的所有操作,
bool Publish(T value) {
const size_t curr_head = head_.load(std::memory_order::acquire);
const size_t curr_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail), std::memory_order::release);
return true;
}
同样,消费可以通过 tail_
变量的 acquire
原子加载与发布 release
原子存储 tail_.store(Next(curr_tail), std::memory_order::release);
同步,如果消费可以看到 tail_
它保证看到之前执行的所有操作。
bool Consume(T& value) {
const size_t curr_tail = tail_.load(std::memory_order::acquire);
const size_t curr_head = head_.load(std::memory_order::relaxed);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head), std::memory_order::release);
return true;
}
我有一个环形缓冲区,如下所示:
template<class T>
class RingBuffer {
public:
bool Publish();
bool Consume(T& value);
bool IsEmpty(std::size_t head, std::size_t tail);
bool IsFull(std::size_t head, std::size_t tail);
private:
std::size_t Next(std::size_t slot);
std::vector<T> buffer_;
std::atomic<std::size_t> tail_{0};
std::atomic<std::size_t> head_{0};
static constexpr std::size_t kBufferSize{8};
};
此数据结构用于两个线程:发布者线程和消费者线程。下面列出了两个没有将内存命令传递给原子的主要函数:
bool Publish(T value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(/* memory_order */);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail) /*, memory order */);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(/* memory order */);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head) /*, memory order */);
return true;
}
我知道,至少,我必须在 Publish()
函数中包含 tail_.store()
,在 std::memory_order::release
中包含 tail_.load()
,在 [=] 中包含 std::memory_order::acquire
19=] 函数创建 发生在 写入 buffer_
和读取 buffer_
之间的连接之前。此外,我可以将 std::memory_order::relaxed
传递给 Publish()
中的 tail_.load()
和 Consume()
中的 head_.load()
,因为同一个线程将看到对原子的最后一次写入。现在的功能是这样的:
bool Publish(T value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail), std::memory_order::release);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(std::memory_order::relaxed);
const size_t curr_tail = tail_.load(std::memory_order::acquire);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head) /*, memory order */);
return true;
}
最后一步是将内存顺序放入剩余的对中:head_.load()
在 Publish()
和 head_.store()
在 Consume()
中。我必须在 Consume()
中的 head_.store()
之前执行 value = std::move(buffer_[curr_head]);
行,否则在缓冲区已满的情况下我将进行数据竞争,因此,至少,我必须通过 std::memory_order::release
到该存储操作以避免重新排序。但是我是否必须在 Publish()
函数中将 std::memory_order::acquire
放入 head_.load()
中?我知道这将有助于 head_.load()
在合理的时间内看到 head_.store()
,这与 std::memory_order::relaxed
不同,但如果我不需要这种更短时间的保证来看到商店操作的副作用,我可以放宽记忆顺序吗?如果我不能,那为什么?完成代码:
bool Publish(T value) {
const size_t curr_head = head_.load(std::memory_order::relaxed); // or acquire?
const size_t curr_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail), std::memory_order::release);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(std::memory_order::relaxed);
const size_t curr_tail = tail_.load(std::memory_order::acquire);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head), std::memory_order::release);
return true;
}
每个原子的内存顺序是否正确?我对每个原子变量中每个内存顺序的使用的解释是否正确?
以前的答案可能有助于作为背景:
c++, std::atomic, what is std::memory_order and how to use them?
https://bartoszmilewski.com/2008/12/01/c-atomics-and-memory-ordering/
首先,您描述的系统称为单一生产者 - 单一消费者队列。您可以随时查看此 container 的增强版本进行比较。我经常会检查 boost 代码,即使我在不允许 boost 的情况下工作。这是因为检查和理解一个稳定的解决方案会让您深入了解您可能遇到的问题(他们为什么那样做?哦,我明白了 - 等等)。 鉴于您的设计,并且编写了许多类似的容器,我会说您的设计必须小心区分空的和满的。如果你使用经典的 {begin,end} 对,你会遇到由于包装
{begin, begin+size} == {begin, begin} == empty
好的,回到同步问题。
考虑到顺序只影响重新排序,在Publish 中使用release 似乎是教科书式的flag 用法。在容器的大小增加之前没有任何东西会读取该值,因此您不关心值本身的写入顺序是否以随机顺序发生,您只关心在计数增加之前必须完全写入该值.所以我同意,您在 Publish 函数中正确使用了标志。
我确实质疑在 Consume 中是否需要“释放”,但如果您要移出队列,并且这些移动会产生副作用,则可能需要这样做。我会说,如果您追求原始速度,那么可能值得制作第二个版本,它专门用于 trivial objects,使用宽松的顺序来增加头部。
您也可以像 push/pop 一样考虑就地 new/delete。虽然大多数移动都会使对象处于空状态,但标准只要求它在移动后保持有效状态。移动后显式删除对象可能会避免以后出现模糊的错误。
您可能会争辩说 consume 中的两个原子负载可能是 memory_order_consume。这放宽了限制说“我不在乎它们加载的顺序,只要它们在使用时都已加载”。尽管我怀疑它在实践中是否会产生任何收益。我也对这个建议感到紧张,因为当我查看增强版本时,它非常接近您所拥有的版本。 https://www.boost.org/doc/libs/1_66_0/boost/lockfree/spsc_queue.hpp
template <typename Functor>
bool consume_one(Functor & functor, T * buffer, size_t max_size)
{
const size_t write_index = write_index_.load(memory_order_acquire);
const size_t read_index = read_index_.load(memory_order_relaxed);
if ( empty(write_index, read_index) )
return false;
T & object_to_consume = buffer[read_index];
functor( object_to_consume );
object_to_consume.~T();
size_t next = next_index(read_index, max_size);
read_index_.store(next, memory_order_release);
return true;
}
总的来说,尽管您的方法看起来不错并且与增强版本非常相似。但是......如果我是你,我可能会逐行浏览提升版本,看看它有什么不同。很容易出错。
编辑: 抱歉,我刚刚注意到您的代码中 memory_order_acquire/memory_order_relaxed 标记的顺序错误。 你应该让最后一个写的是“发布”,第一个读的是“获取”。这不会显着影响行为,但会使其更易于阅读。 (开始同步,结束同步)
回复评论: 正如@user17732522所暗示的那样,复制操作也是写入,因此对普通对象的优化不会同步,对普通对象的优化将引入U/B(该死的很容易出错)
为了理解这个问题的正确std::memory_order
,让我们考虑一下是否只有一个线程而不是生产者和消费者线程。
在单线程场景中,bool Publish(T value)
将看到前一个 bool Consume(T& value)
以相同顺序执行的所有操作,类似地,bool Consume(T& value)
将看到前一个 bool Publish(T value)
以相同顺序执行的所有操作。
所以在多线程场景下,发布者和消费者线程必须以类似的方式同步。可以通过memory barriers
.
实现同步
Consume function release
atomic store head_.store(Next(curr_head), std::memory_order::release);
保证在 store
之前发出的操作将在它之前执行并且不会跨越它,并且发布可以通过 acquire
原子加载同步 head_
变量,如果 Publish 可以看到 head_
则可以保证它会看到在它之前执行的所有操作,
bool Publish(T value) {
const size_t curr_head = head_.load(std::memory_order::acquire);
const size_t curr_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail), std::memory_order::release);
return true;
}
同样,消费可以通过 tail_
变量的 acquire
原子加载与发布 release
原子存储 tail_.store(Next(curr_tail), std::memory_order::release);
同步,如果消费可以看到 tail_
它保证看到之前执行的所有操作。
bool Consume(T& value) {
const size_t curr_tail = tail_.load(std::memory_order::acquire);
const size_t curr_head = head_.load(std::memory_order::relaxed);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head), std::memory_order::release);
return true;
}