boost::lockfree::queue(在多线程程序中)是否可锁定?

Is boost::lockfree::queue (in multithreaded program) lockable?

我正在开发一个程序,其中 2+ (gstreamer) boost:: 线程和相同数量的 boost:: 线程一个虚拟应用程序同时使用 a queue。现在这个 queue 用于 gstreamer 线程的任务与 其对应的 虚拟应用程序线程 [=66] 之间的 同步 =].

queue是一个EVENTqueue:其中EVENT是一个结构

typedef struct EVENT{
    EVENT_TYPE Ev_Type;  // EVENT_TYPE is enum of Events
    EVENT_DATA Ev_Data;  // EVENT_DATA is union of data to be stored for that event
}Event_;

谷歌搜索后,我遇到了 queue 的这两个选项:lockfree::queue & lockfree::spsc_queue,提示lockfree::queues用于多线程应用

CONFUSION: Why the name lockFREE? is it suggesting cannot be (mutex) locked?

Also see this example, it says "boost::lockfree::queue is not lockfree"

Mind=blown...

嗯,然后我尝试按照示例 (above link) 并实现此 queue

class Foo {
protected:
    boost::lockfree::queue<EVENT> mqSttEventQueue;
public:
    unsigned int SetEventIntoQueue(EVENT *psEvent);
};

其定义为:

unsigned int Foo::SetEventIntoQueue(EVENT *psEvent) {
    if(mqSttEventQueue.push(*psEvent)){
         //notify that event is in queue;
    }
}

编译成功。但是我这里完全运行一头雾水

问题:

  • 为什么示例将 queue 声明为

    boost::lockfree::queue<int> queue(128);

128 有什么用?是说 queue 大小是 128 (bytes/items) 吗? queue<int> 是否在 queue 中声明数据类型?

  • 为什么它对我的程序不起作用

    boost::lockfree::queue<EVENT> mqSttEventQueue(128);

如果我这样声明它,它会出现编译错误

error: expected identifier before numeric constant

boost::lockfree::queue<EVENT> mqSttEventQueue(128);
                                              ^~~

PS:- 我真的不确定在这里放什么标题...如果可以的话请编辑它。

Why the name lockFREE? is it suggesting cannot be (mutex) locked?

当然任何东西都可以被锁定;您将互斥锁放在数据结构之外,让每个接触数据结构的线程都使用它。

boost::lockfree::queue 提供 unsynchronized_popunsynchronized_push 供您确保只有一个线程可以访问队列的情况使用。

但是lockfree::queue和无锁算法/数据结构的主要目的是它们不需要锁定:多线程可以安全地写入 and/or 同时读取。


“无锁”在编程中有 2 个含义,导致可能令人困惑但真实的陈述,如“这个无锁算法不是无锁的”。

  • 随意使用:无锁的同义词 - 在没有互斥锁的情况下实现,使用原子加载、存储和 RMW 操作,如 CAS 或 std::atomic::atomic_fetch_add。例如参见 [​​=29=].

    std::shared_ptr 使用无锁原子操作来管理其控制块。 C++11 std::atomic<> provides lockless primitives for custom algorithms. See 。通常在 C++11 中,多个线程对同一变量的非同步访问是未定义的行为。 (除非它们都是只读的。)但是 std::atomic 为您提供定义明确的行为,您可以选择顺序一致性 acquire/release 或宽松的内存排序。

  • 技术计算机科学意义:永远休眠或被杀死的线程不会阻塞其余线程。即保证整个程序的前进进度(至少一个线程)。 (无等待是指线程永远不必重试)。参见 https://en.wikipedia.org/wiki/Non-blocking_algorithm。 CAS 重试循环是一个 class 无锁但非无等待的典型例子。无等待是 RCU(读取-复制-更新)读取线程之类的东西,或者根据定义,硬件上的 atomic_fetch_add 将其实现为原语(例如 x86 xadd),而不是LL/SC 或 CAS 重试循环。

大多数无锁多reader/多写队列在技术意义上不是无锁的。通常他们使用循环缓冲区,写者会以某种方式“声明”一个条目(固定其在队列中的顺序)。但在作者 完成 写入条目本身之前无法读取。

有关分析其可能的阻塞行为的示例,请参阅 。写入器以原子方式递增写入索引,然后将数据写入数组条目。如果作者在做这些事情之间睡着了,其他作者可以填写后面的条目,而 readers 则被困在等待声称但未写入的条目。 (我没看过boost::lockfree::queue<T>,不过估计差不多1。)

在实践中性能非常好,writers 和 readers 之间的争用非常少。但理论上,编写器可能会在错误的时刻阻塞并拖延整个队列。


脚注 1:另一个可能的队列选项是链表。在这种情况下,您可以完全构建一个新节点,然后尝试将其 CAS 到列表中。因此,如果您成功添加它,那么其他线程可以立即读取它,因为您已正确设置它的指针。

但是回收问题(安全地释放其他线程可能正在读取的内存以查看是否另一个 reader 已经声明了它们)在垃圾收集语言/环境之外非常棘手。 (例如 Java)


boost::lockfree::queue<int> queue(128); Why 128?

这是条目中的队列(最大)大小。 int 在这种情况下,因为你使用了 queue<int>,呃。如上所述,大多数无锁队列使用固定大小的循环缓冲区。当它需要增长时,它不能像 std::vector 那样重新分配和复制,因为其他线程可以同时读取它。

如记录in the manualboost::lockfree::queue 的第一个 google 命中),explicit queue(size_type) 构造函数采用大小。

您还可以通过将容量用作模板参数来将容量烘焙到类型中。 (因此容量在使用队列的任何地方都成为编译时常量,而不仅仅是在可以从构造函数调用进行常量传播的地方。)

class 显然不强制/要求 2 的幂大小,因此模板大小参数可以通过让 % capacity 操作编译成一个 AND 来优化得更好掩码而不是除法。