无锁单个 producer/single 消费者循环缓冲区

Lock free single producer/single consumer circular buffer

我一直在查看 this website 上的无锁单个 producer/single 消费者循环缓冲区,当时我无法弄清楚为什么需要特定的内存屏障。 我已经仔细阅读了一百遍 the standard rules about memory order 但我不明白我错过了什么。

使用此实现,只有一个唯一线程可以调用 push() 函数和另一个唯一线程可以调用 pop() 函数。

这里是 Producer 代码:

bool push(const Element& item)
{       
  const auto current_tail = _tail.load(std::memory_order_relaxed);  //(1)
  const auto next_tail = increment(current_tail);

  if(next_tail != _head.load(std::memory_order_acquire))            //(2)               
  {     
    _array[current_tail] = item;                                    //(3)
    _tail.store(next_tail, std::memory_order_release);              //(4)
    return true;
  }
  return false; // full queue
}

这里是 Consumer 代码:

bool pop(Element& item)
{
  const auto current_head = _head.load(std::memory_order_relaxed);    //(1)
  if(current_head == _tail.load(std::memory_order_acquire))           //(2)
    return false; // empty queue

  item = _array[current_head];                                       //(3)
  _head.store(increment(current_head), std::memory_order_release);   //(4)
  return true;
}

我理解为什么 Producer (4)Consumer (2) 语句是绝对需要的,这是因为我们必须确保所有 发生在 之前的写入一旦 consumer 看到存储的值,Producer(4) released store 将是可见的副作用。

我也明白为什么需要Consumer (4)语句,这是为了确保在执行Consumer (4)存储之前执行Consumer (3)加载。

问题

内存屏障阻止 CPU 对 Element 对象的访问重新排序,这些访问未使用互锁,跨越对队列结构的访问(此处使用索引实现,但指针同样可行).

使用您的编号,必须在 (2) 和 (4) 之间执行 (3),并且内存屏障提供了这一点。

您在 Producer 中询问的 (2)-vs-(3) 的确切情况可防止在队列已满时推测性地覆盖有效数据(建议的站点与有效数据重叠)。如果没有屏障,即使条件失败时原始数据将从生产者线程的角度恢复,中间值可能会短暂地对消费者可见。

我们需要证明

_array[current_tail] = item; // push(3)

符合 (current_head == current_tail)

之后执行
item = _array[current_head]; // pop(3)

完成。我们可以覆盖单元格,只有在它的数据已经复制到 item

之后

_head.load(std::memory_order_acquire) // push(2)

同步
_head.store(increment(current_head), std::memory_order_release);   //pop(4)

通过发布-获取排序:

发生在原子存储释放(pop(4))之前的所有内存写入(pop(3)_head_head.

上完成原子加载获取( push(2) )后,副作用就会变得可见

so Producer代码在push(2)完成后,保证能看到pop(3)的结果。这意味着来自 _array[current_head] 的数据被复制到项目并且此操作的结果在 push(2) 之后对生产者代码可见,因此 _array[current_head] 已经空闲。

来自 memory_order_acquire 负载描述的另一侧 - 在此负载之前,当前线程中的读取或写入 (push(3)) 无法重新排序。所以 push(3) 将在 push(2) 加载完成后执行,但此时 pop(3) 已经完成

item = _array[current_head];                                        //pop(3)
_head.store(increment(current_head), std::memory_order_release);    //pop(4)
-----
    _head.load(std::memory_order_acquire);                          //push(2)
    _array[current_tail] = item;                                    //push(3)