Boost的无锁队列有可能丢元素吗?
Is it possible for Boost's lockfree queue to lose elements?
我有一个单线程应用程序,它使用boost::lockfree::queue
(来自Boost 1.57)。 (我使用队列的原因是它是多线程应用程序中也使用的库的一部分。)
队列似乎以某种方式丢失了元素。
使用队列的class表现如下:
- 有两个单独的函数在队列上调用
push
。除了队列本身提供的内容之外,没有线程安全逻辑。
- 有一个函数消耗队列中的元素;它使用
while(my_queue.pop(temp_element))
循环消耗 all 元素。此函数受互斥锁保护,以确保所有元素都进入 单个 消费者。但是,如果连续检测到两个 duplicate 个元素(与 ==
相比),则跳过第二个元素。
我观察到元素似乎没有在系统中持续传播,因此使用 std::atomic_uint
s,我创建了一些计数器来检查离开队列的警报数量是否与进入队列的警报数量相同:
- 两种
push
ing 方法中的每一种都有单独的计数器。我们称它们为 pushed_method1
和 pushed_method2
.
"processed" 元素和 "skipped" 元素有单独的计数器。我们称它们为 consumed
和 skipped
。在循环中,它们更新如下(重命名变量,并在 [square brackets]
中使用描述性伪代码):
ElementType previous{[ initialize in invalid state ]};
while (my_queue.pop(temp_element))
{
if (! [ check if `previous` is valid ]
|| previous != temp_element)
{
[ ... log a message ]
[ ... insert the element into a vector to be returned ]
++consumed;
}
else
{
// ... log a message
++skipped;
}
previous = temp_element;
}
在循环之后,我检查计数器和队列的状态并进行健全性检查:
auto postconsume_pushed_method1 = pushed_method1.load();
auto postconsume_pushed_method2 = pushed_method2.load();
auto all_consumed = my_queue.empty();
assert(
!all_consumed
|| postconsume_pushed_method1 + postconsume_pushed_method2
== consumed + skipped);
这个断言通常 通过——但在我的单线程应用中它有时会失败 ]. (它有时在多线程情况下也会失败,但我希望关注单线程情况以简化问题。)
在多线程的情况下,我可以想象操作被重新排序,以便在 .empty()
检查之后出现一个或多个 load
s,但我希望这是不是这种情况,因为我希望 std::atomic
和 boost::lockfree
包含用于此类事情的内存栅栏。
然而,在单线程的情况下,这个应该无关紧要,因为元素应该永远不会是push
在应用程序执行 "consume" 方法时编辑,并且警报队列应该 始终 在循环之后为空(因为循环不会中断,直到 pop
returns false
).
我是不是误会了什么?我的逻辑有错误吗?我在 boost::lockfree::queue
中发现错误的可能性很小吗?
队列是无锁的,因此push
队列满时不会阻塞。
lockfree::queue::push
returns 一个布尔值,表示元素是否已进入队列。当队列已满时 push
returns false
,因此您可能想检查一下这种情况。
我有一个单线程应用程序,它使用boost::lockfree::queue
(来自Boost 1.57)。 (我使用队列的原因是它是多线程应用程序中也使用的库的一部分。)
队列似乎以某种方式丢失了元素。
使用队列的class表现如下:
- 有两个单独的函数在队列上调用
push
。除了队列本身提供的内容之外,没有线程安全逻辑。 - 有一个函数消耗队列中的元素;它使用
while(my_queue.pop(temp_element))
循环消耗 all 元素。此函数受互斥锁保护,以确保所有元素都进入 单个 消费者。但是,如果连续检测到两个 duplicate 个元素(与==
相比),则跳过第二个元素。
我观察到元素似乎没有在系统中持续传播,因此使用 std::atomic_uint
s,我创建了一些计数器来检查离开队列的警报数量是否与进入队列的警报数量相同:
- 两种
push
ing 方法中的每一种都有单独的计数器。我们称它们为pushed_method1
和pushed_method2
. "processed" 元素和 "skipped" 元素有单独的计数器。我们称它们为
consumed
和skipped
。在循环中,它们更新如下(重命名变量,并在[square brackets]
中使用描述性伪代码):ElementType previous{[ initialize in invalid state ]}; while (my_queue.pop(temp_element)) { if (! [ check if `previous` is valid ] || previous != temp_element) { [ ... log a message ] [ ... insert the element into a vector to be returned ] ++consumed; } else { // ... log a message ++skipped; } previous = temp_element; }
在循环之后,我检查计数器和队列的状态并进行健全性检查:
auto postconsume_pushed_method1 = pushed_method1.load();
auto postconsume_pushed_method2 = pushed_method2.load();
auto all_consumed = my_queue.empty();
assert(
!all_consumed
|| postconsume_pushed_method1 + postconsume_pushed_method2
== consumed + skipped);
这个断言通常 通过——但在我的单线程应用中它有时会失败 ]. (它有时在多线程情况下也会失败,但我希望关注单线程情况以简化问题。)
在多线程的情况下,我可以想象操作被重新排序,以便在 .empty()
检查之后出现一个或多个 load
s,但我希望这是不是这种情况,因为我希望 std::atomic
和 boost::lockfree
包含用于此类事情的内存栅栏。
然而,在单线程的情况下,这个应该无关紧要,因为元素应该永远不会是push
在应用程序执行 "consume" 方法时编辑,并且警报队列应该 始终 在循环之后为空(因为循环不会中断,直到 pop
returns false
).
我是不是误会了什么?我的逻辑有错误吗?我在 boost::lockfree::queue
中发现错误的可能性很小吗?
队列是无锁的,因此push
队列满时不会阻塞。
lockfree::queue::push
returns 一个布尔值,表示元素是否已进入队列。当队列已满时 push
returns false
,因此您可能想检查一下这种情况。