简单 Lock-Free MPSC 环形缓冲区的问题

Troubles with simple Lock-Free MPSC Ring Buffer

我正在尝试为多个生产者和一个消费者实现 array-based 环形缓冲区 thread-safe。主要思想是拥有原子头和尾索引。将元素推送到 queue 时,头部会自动增加以在缓冲区中保留一个槽:

#include <atomic>
#include <chrono>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <vector>

template <class T> class MPSC {
private:
  int MAX_SIZE;

  std::atomic<int> head{0}; ///< index of first free slot
  std::atomic<int> tail{0}; ///< index of first occupied slot

  std::unique_ptr<T[]> data;
  std::unique_ptr<std::atomic<bool>[]> valid; ///< indicates whether data at an
                                              ///< index has been fully written

  /// Compute next index modulo size.
  inline int advance(int x) { return (x + 1) % MAX_SIZE; }

public:
  explicit MPSC(int size) {
    if (size <= 0)
      throw std::invalid_argument("size must be greater than 0");

    MAX_SIZE = size + 1;
    data = std::make_unique<T[]>(MAX_SIZE);
    valid = std::make_unique<std::atomic<bool>[]>(MAX_SIZE);
  }

  /// Add an element to the queue.
  ///
  /// If the queue is full, this method blocks until a slot is available for
  /// writing. This method is not starvation-free, i.e. it is possible that one
  /// thread always fills up the queue and prevents others from pushing.
  void push(const T &msg) {
    int idx;
    int next_idx;
    int k = 100;
    do {
      idx = head;
      next_idx = advance(idx);

      while (next_idx == tail) {     // queue is full
        k = k >= 100000 ? k : k * 2; // exponential backoff
        std::this_thread::sleep_for(std::chrono::nanoseconds(k));
      } // spin

    } while (!head.compare_exchange_weak(idx, next_idx));

    if (valid[idx])
      // this throws, suggesting that two threads are writing to the same index. I have no idea how this is possible.
      throw std::runtime_error("message slot already written");

    data[idx] = msg;
    valid[idx] = true; // this was set to false by the reader,
                       // set it to true to indicate completed data write
  }

  /// Read an element from the queue.
  ///
  /// If the queue is empty, this method blocks until a message is available.
  /// This method is only safe to be called from one single reader thread.
  T pop() {
    int k = 100;
    while (is_empty() || !valid[tail]) {
      k = k >= 100000 ? k : k * 2;
      std::this_thread::sleep_for(std::chrono::nanoseconds(k));
    } // spin
    T res = data[tail];
    valid[tail] = false;
    tail = advance(tail);
    return res;
  }

  bool is_full() { return (head + 1) % MAX_SIZE == tail; }

  bool is_empty() { return head == tail; }
};

当出现大量拥塞时,一些消息会被其他线程覆盖。因此,我在这里所做的事情一定存在根本性的错误。

似乎正在发生的事情是两个线程正在获取相同的索引以将其数据写入其中。为什么会这样?

即使生产者在写入数据之前暂停,尾巴也不会增加超过这个线程 idx,因此没有其他线程应该能够超越并声明相同的 idx。

编辑

冒着发布过多代码的风险,这里有一个重现问题的简单程序。它从多个线程发送一些递增的数字,并检查消费者是否收到所有数字:

#include "mpsc.hpp" // or whatever; the above queue
#include <thread>
#include <iostream>

int main() {
  static constexpr int N_THREADS = 10; ///< number of threads
  static constexpr int N_MSG = 1E+5;   ///< number of messages per thread

  struct msg {
    int t_id;
    int i;
  };

  MPSC<msg> q(N_THREADS / 2);

  std::thread threads[N_THREADS];

  // consumer
  threads[0] = std::thread([&q] {
    int expected[N_THREADS] {};

    for (int i = 0; i < N_MSG * (N_THREADS - 1); ++i) {
      msg m = q.pop();
      std::cout << "Got message from T-" << m.t_id << ": " << m.i << std::endl;
      if (expected[m.t_id] != m.i) {
        std::cout << "T-" << m.t_id << " unexpected msg " << m.i << "; expected " << expected[m.t_id] << std::endl;
        return -1;
      }
      expected[m.t_id] = m.i + 1;
    }
  });

  // producers
  for (int id = 1; id < N_THREADS; ++id) {
    threads[id] = std::thread([id, &q] {
      for (int i = 0; i < N_MSG; ++i) {
        q.push(msg{id, i});
      }
    });
  }

  for (auto &t : threads)
    t.join();
}

I am trying to implement an array-based ring buffer that is thread-safe for multiple producers and a single consumer.

我假设您将此作为学习练习。如果您想解决实际问题,那么自己实施 lock-free 队列很可能是错误的做法。

What seems to be happening is that two threads are acquiring the same index to write their data to. Why could that be?

生产者自旋锁与外部 CAS 循环的组合未按预期方式工作:

do {
  idx = head;
  next_idx = advance(idx);

  while (next_idx == tail) {     // queue is full
    k = k >= 100000 ? k : k * 2; // exponential backoff
    std::this_thread::sleep_for(std::chrono::nanoseconds(k));
  } // spin

// 
// ...
//
// All other threads (producers and consumers) can progress.
//
// ...
//

} while (!head.compare_exchange_weak(idx, next_idx));

CAS 发生时队列可能已满,因为这些检查是独立执行的。此外,CAS 可能会成功,因为其他线程可能已提前 head 以完全匹配 idx.