具有单一生产者单一消费者的无锁循环缓冲区

Lockless circular buffer with single producer singular consumer

我有一个绝不能锁定或分配内存的消费者线程,以及一个可以的生产者线程。 我想实现一个两个位置的循环缓冲区,以便能够从生产者向消费者线程提供数据,只要没有新数据可供使用,消费者就会重新使用已经可用的数据。

这是我目前想出的:

bool newDataAvailable = false;
bool bufferEmpty = true;

foo* currentData = new foo();
foo* newData = new foo();

void consumer() {
  while(true) {
    currentData->doSomething();
    if(newDataAvailable) {
      foo* tmp = currentData;

      // Objects are swapped so the old one can be reused without additional allocations
      currentData = newData;
      newData = tmp;
      newDataAvailable = false;
      bufferEmpty = true;
    }
  }
}

void producer() {
  while(true) {
    while(!bufferEmpty) { wait(); }
    newData->init();
    bufferEmpty = false;
    newDataAvailable = true;
  }
}

这种天真的实现可以吗?我知道读取和写入变量可以是非原子的,所以我应该使用原子存储,但那些会导致锁定。这里需要使用原子存储吗? 另外,我想消除生产者中的主动等待,我想我可以使用 std::condition_variable,但它们需要使用互斥锁,而我负担不起。

在不使用互斥量的情况下编写共享变量的多线程代码很难正确。 参见 An Introduction to Lock-Free Programming, Lock Free Buffer

如果你绝对必须避免使用互斥锁,那么我强烈建议使用预制的无锁队列,例如Boost.lockfree or MPMCQueue 作为轻型非增强替代品。

I know reading and writing to variables can be non-atomic, so I should use an atomic storage, but those can cause locks.

std::atomic 通常对所有原始类型(不超过 cpu 的本机大小)都是无锁的(不使用互斥体)。 您可以通过调用 std::atomic<T>::is_lock_free

检查 std::atomic 是否会对给定类型使用互斥锁

Is the use of atomic storage needed here?

是的,绝对是。您要么需要使用互斥锁,要么需要使用原子。

Also, I'd like to eliminate the active wait in the producer, and I thought I could use a std::condition_variable

当您不能使用互斥体时,您唯一的选择就是使用自旋锁。 如果在您的上下文中允许,您可以在自旋锁中使用 std::this_thread::yield() 来减少 CPU 负载。 (但是互斥量可能会更快)

编辑: 只有 2 个原子的潜在解决方案是:

std::atomic<foo*> currentData = new foo();
std::atomic<foo*> newData = new foo();

void consumer() {
    foo* activeData = currentData;
    while (true) {
        activeData->doSomething();
        foo* newItem = currentData;
        if (newItem != activeData) {
            newData = activeData;
            activeData = newItem;
        }
    }
}

void producer() {
    while (true) {
        foo* reusedData = newData;
        if (!reusedData)
            continue;
        newData = nullptr;
        reusedData->init();
        currentData = reusedData;
    }
}