无锁 "closable" MPSC 队列

lock-free "closable" MPSC queue

多生产者单消费者场景,除了消费发生一次,之后队列为"closed",不允许再工作。我有一个 MPSC 队列,所以我尝试将无锁算法添加到 "close" 队列中。我相信它是正确的并且通过了我的测试。问题是当我尝试优化内存顺序时它停止工作(我认为工作丢失了,例如在队列关闭后排队)。即使在具有 "kind of" 强大内存模型的 x64 上,即使只有一个生产者。

我试图微调内存顺序的尝试被注释掉了:

// thread-safe for multi producers single consumer use
// linked-list based, and so it's growable 
MPSC_queue work_queue;
std::atomic<bool> closed{ false };
std::atomic<int32_t> producers_num{ 0 };
bool produce(Work&& work)
{
    bool res = false;

    ++producers_num;
    // producers_num.fetch_add(1, std::memory_order_release);
    if (!closed)
    // if (!closed.load(std::memory_order_acquire))
    {
        work_queue.push(std::move(work));
        res = true;
    }
    --producers_num;
    // producers_num.fetch_sub(1, std::memory_order_release);

    return res;
}
void consume()
{
    closed = true;
    // closed.store(true, std::memory_order_release);

    while (producers_num != 0)
    // while (producers_num.load(std::memory_order_acquire) != 0)
        std::this_thread::yield();

    Work work;
    while (work_queue.pop(work))
        process(work);
}

我也在 producers_num 上尝试了 std::memory_order_acq_rel 的读-修改-写操作,也没有用。

加分题:

此算法与 MPSC 队列一起使用,它已经在内部进行了一些同步。将它们结合起来以获得更好的性能会很好。你知道 "closable" MPSC 队列的算法吗?

我认为 closed = true; 确实需要 seq_cst 以确保它对其他线程可见 您第一次检查 producers_num 之前.否则可以这样排序:

  • 制作人:++producers_num;
  • 消费者:producers_num == 0
  • 生产者:if (!closed)发现它仍然开放
  • 消费者:close.store(true, release) 变得全局可见。
  • 消费者:work_queue.pop(work)发现队列为空
  • 生产者:work_queue.push(std::move(work));在消费者停止寻找后将工作添加到队列。

如果您在返回之前让消费者检查 producers_num == 0,您仍然可以避免seq_cst,例如

    while (producers_num != 0)
    // while (producers_num.load(std::memory_order_acquire) != 0)
        std::this_thread::yield();

    do {
        Work work;
        while (work_queue.pop(work))
            process(work);
    } while(producers_num.load(acquire) != 0);
    // safe if pop included a full barrier, I think

我不是 100% 确定我有这个权利,但我认为在完全障碍后检查 producer_num 就足够了。

不过,生产方确实需要++producers_num;至少acq_rel,否则可以重新排序过去if (!closed)。 (在它之后,在 if(!closed) 之前的获取栅栏也可能有效)。


由于您只想使用队列一次,因此不需要回绕,而且可能会简单得多。就像一个原子生产者位置计数器,作者递增以声明一个位置,如果他们得到一个位置 > 大小,那么队列已满。不过我还没有考虑完整的细节。

这可能会为上述问题提供更清晰的解决方案,也许是让消费者查看该写入索引以查看是否有任何生产者