无锁 "closable" MPSC 队列
lock-free "closable" MPSC queue
多生产者单消费者场景,除了消费发生一次,之后队列为"closed",不允许再工作。我有一个 MPSC 队列,所以我尝试将无锁算法添加到 "close" 队列中。我相信它是正确的并且通过了我的测试。问题是当我尝试优化内存顺序时它停止工作(我认为工作丢失了,例如在队列关闭后排队)。即使在具有 "kind of" 强大内存模型的 x64 上,即使只有一个生产者。
我试图微调内存顺序的尝试被注释掉了:
// thread-safe for multi producers single consumer use
// linked-list based, and so it's growable
MPSC_queue work_queue;
std::atomic<bool> closed{ false };
std::atomic<int32_t> producers_num{ 0 };
bool produce(Work&& work)
{
bool res = false;
++producers_num;
// producers_num.fetch_add(1, std::memory_order_release);
if (!closed)
// if (!closed.load(std::memory_order_acquire))
{
work_queue.push(std::move(work));
res = true;
}
--producers_num;
// producers_num.fetch_sub(1, std::memory_order_release);
return res;
}
void consume()
{
closed = true;
// closed.store(true, std::memory_order_release);
while (producers_num != 0)
// while (producers_num.load(std::memory_order_acquire) != 0)
std::this_thread::yield();
Work work;
while (work_queue.pop(work))
process(work);
}
我也在 producers_num
上尝试了 std::memory_order_acq_rel
的读-修改-写操作,也没有用。
加分题:
此算法与 MPSC 队列一起使用,它已经在内部进行了一些同步。将它们结合起来以获得更好的性能会很好。你知道 "closable" MPSC 队列的算法吗?
我认为 closed = true;
确实需要 seq_cst 以确保它对其他线程可见 在 您第一次检查 producers_num
之前.否则可以这样排序:
- 制作人:
++producers_num;
- 消费者:
producers_num == 0
- 生产者:
if (!closed)
发现它仍然开放
- 消费者:
close.store(true, release)
变得全局可见。
- 消费者:
work_queue.pop(work)
发现队列为空
- 生产者:
work_queue.push(std::move(work));
在消费者停止寻找后将工作添加到队列。
如果您在返回之前让消费者检查 producers_num == 0
,您仍然可以避免seq_cst,例如
while (producers_num != 0)
// while (producers_num.load(std::memory_order_acquire) != 0)
std::this_thread::yield();
do {
Work work;
while (work_queue.pop(work))
process(work);
} while(producers_num.load(acquire) != 0);
// safe if pop included a full barrier, I think
我不是 100% 确定我有这个权利,但我认为在完全障碍后检查 producer_num
就足够了。
不过,生产方确实需要++producers_num;
至少acq_rel,否则可以重新排序过去if (!closed)
。 (在它之后,在 if(!closed)
之前的获取栅栏也可能有效)。
由于您只想使用队列一次,因此不需要回绕,而且可能会简单得多。就像一个原子生产者位置计数器,作者递增以声明一个位置,如果他们得到一个位置 > 大小,那么队列已满。不过我还没有考虑完整的细节。
这可能会为上述问题提供更清晰的解决方案,也许是让消费者查看该写入索引以查看是否有任何生产者
多生产者单消费者场景,除了消费发生一次,之后队列为"closed",不允许再工作。我有一个 MPSC 队列,所以我尝试将无锁算法添加到 "close" 队列中。我相信它是正确的并且通过了我的测试。问题是当我尝试优化内存顺序时它停止工作(我认为工作丢失了,例如在队列关闭后排队)。即使在具有 "kind of" 强大内存模型的 x64 上,即使只有一个生产者。
我试图微调内存顺序的尝试被注释掉了:
// thread-safe for multi producers single consumer use
// linked-list based, and so it's growable
MPSC_queue work_queue;
std::atomic<bool> closed{ false };
std::atomic<int32_t> producers_num{ 0 };
bool produce(Work&& work)
{
bool res = false;
++producers_num;
// producers_num.fetch_add(1, std::memory_order_release);
if (!closed)
// if (!closed.load(std::memory_order_acquire))
{
work_queue.push(std::move(work));
res = true;
}
--producers_num;
// producers_num.fetch_sub(1, std::memory_order_release);
return res;
}
void consume()
{
closed = true;
// closed.store(true, std::memory_order_release);
while (producers_num != 0)
// while (producers_num.load(std::memory_order_acquire) != 0)
std::this_thread::yield();
Work work;
while (work_queue.pop(work))
process(work);
}
我也在 producers_num
上尝试了 std::memory_order_acq_rel
的读-修改-写操作,也没有用。
加分题:
此算法与 MPSC 队列一起使用,它已经在内部进行了一些同步。将它们结合起来以获得更好的性能会很好。你知道 "closable" MPSC 队列的算法吗?
我认为 closed = true;
确实需要 seq_cst 以确保它对其他线程可见 在 您第一次检查 producers_num
之前.否则可以这样排序:
- 制作人:
++producers_num;
- 消费者:
producers_num == 0
- 生产者:
if (!closed)
发现它仍然开放 - 消费者:
close.store(true, release)
变得全局可见。 - 消费者:
work_queue.pop(work)
发现队列为空 - 生产者:
work_queue.push(std::move(work));
在消费者停止寻找后将工作添加到队列。
如果您在返回之前让消费者检查 producers_num == 0
,您仍然可以避免seq_cst,例如
while (producers_num != 0)
// while (producers_num.load(std::memory_order_acquire) != 0)
std::this_thread::yield();
do {
Work work;
while (work_queue.pop(work))
process(work);
} while(producers_num.load(acquire) != 0);
// safe if pop included a full barrier, I think
我不是 100% 确定我有这个权利,但我认为在完全障碍后检查 producer_num
就足够了。
不过,生产方确实需要++producers_num;
至少acq_rel,否则可以重新排序过去if (!closed)
。 (在它之后,在 if(!closed)
之前的获取栅栏也可能有效)。
由于您只想使用队列一次,因此不需要回绕,而且可能会简单得多。就像一个原子生产者位置计数器,作者递增以声明一个位置,如果他们得到一个位置 > 大小,那么队列已满。不过我还没有考虑完整的细节。
这可能会为上述问题提供更清晰的解决方案,也许是让消费者查看该写入索引以查看是否有任何生产者