C++ 无锁队列实现中的虚假下溢
Spurious underflow in C++ lock-free queue implementation
我正在尝试实现一个使用线性循环缓冲区来存储数据的无锁队列。与通用无锁队列相比,我有以下放宽条件:
- 我知道将存储在队列中的最坏情况下的元素数量。 队列是对一组固定元素进行操作的系统的一部分。该代码永远不会尝试在队列中存储更多元素,因为此固定集合中有元素。
- 否 multi-producer/multi-consumer. 队列将用于 multi-producer/single-consumer 或 single-producer/multi-consumer设置。
概念上,队列实现如下
- 标准二次幂环形缓冲区。底层数据结构是使用power-of-two trick的标准环形缓冲区。读写索引只会递增。当使用简单的位掩码对数组进行索引时,它们被限制在底层数组的大小。读指针在
pop()
中自动递增,写指针在 push()
. 中自动递增
- 大小变量门访问
pop()
。 另一个 "size" 变量跟踪队列中元素的数量。这消除了对读取和写入索引执行算术的需要。大小变量在整个写操作发生后自动递增,即数据已写入后备存储 并且 写游标已递增。我正在使用 compare-and-swap (CAS) 操作以原子方式减少 pop()
中的大小,并且仅在大小不为零时才继续。这样pop()
应该可以保证return有效数据。
我的队列实现如下。请注意,只要 pop()
尝试读取之前由 push()
写入的内存,调试代码就会停止执行。这永远不应该发生,因为 ‒ 至少在概念上是这样的 ‒ pop()
可能只有在队列中有元素时才会继续(不应该有下溢)。
#include <atomic>
#include <cstdint>
#include <csignal> // XXX for debugging
template <typename T>
class Queue {
private:
uint32_t m_data_size; // Number of elements allocated
std::atomic<T> *m_data; // Queue data, size is power of two
uint32_t m_mask; // Bitwise AND mask for m_rd_ptr and m_wr_ptr
std::atomic<uint32_t> m_rd_ptr; // Circular buffer read pointer
std::atomic<uint32_t> m_wr_ptr; // Circular buffer write pointer
std::atomic<uint32_t> m_size; // Number of elements in the queue
static uint32_t upper_power_of_two(uint32_t v) {
v--; // https://graphics.stanford.edu/~seander/bithacks.html
v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
v++;
return v;
}
public:
struct Optional { // Minimal replacement for std::optional
bool good;
T value;
Optional() : good(false) {}
Optional(T value) : good(true), value(std::move(value)) {}
explicit operator bool() const { return good; }
};
Queue(uint32_t max_size)
: // XXX Allocate 1 MiB of additional memory for debugging purposes
m_data_size(upper_power_of_two(1024 * 1024 + max_size)),
m_data(new std::atomic<T>[m_data_size]),
m_mask(m_data_size - 1),
m_rd_ptr(0),
m_wr_ptr(0),
m_size(0) {
// XXX Debug code begin
// Fill the memory with a marker so we can detect invalid reads
for (uint32_t i = 0; i < m_data_size; i++) {
m_data[i] = 0xDEADBEAF;
}
// XXX Debug code end
}
~Queue() { delete[] m_data; }
Optional pop() {
// Atomically decrement the size variable
uint32_t size = m_size.load();
while (size != 0 && !m_size.compare_exchange_weak(size, size - 1)) {
}
// The queue is empty, abort
if (size <= 0) {
return Optional();
}
// Read the actual element, atomically increase the read pointer
T res = m_data[(m_rd_ptr++) & m_mask].load();
// XXX Debug code begin
if (res == T(0xDEADBEAF)) {
std::raise(SIGTRAP);
}
// XXX Debug code end
return res;
}
void push(T t) {
m_data[(m_wr_ptr++) & m_mask].store(t);
m_size++;
}
bool empty() const { return m_size == 0; }
};
但是,确实会发生下溢,并且很容易在多线程压力测试中触发。在这个特定的测试中,我维护了两个队列 q1
和 q2
。在主线程中,我将固定数量的元素输入 q1
。两个工作线程从 q1
读取并在紧密循环中推送到 q2
。主线程从q2
读取数据,反馈给q1
.
如果只有一个工作线程 (single-producer/single-consumer) 或只要所有工作线程与主线程位于同一 CPU 上,这就可以正常工作。但是,一旦有两个工作线程被显式调度到与主线程不同的 CPU 上,它就会失败。
下面的代码实现了这个测试
#include <pthread.h>
#include <thread>
#include <vector>
static void queue_stress_test_main(std::atomic<uint32_t> &done_count,
Queue<int> &queue_rd, Queue<int> &queue_wr) {
for (size_t i = 0; i < (1UL << 24); i++) {
auto res = queue_rd.pop();
if (res) {
queue_wr.push(res.value);
}
}
done_count++;
}
static void set_thread_affinity(pthread_t thread, int cpu) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
if (pthread_setaffinity_np(thread, sizeof(cpu_set_t),
&cpuset) != 0) {
throw "Error while calling pthread_setaffinity_np";
}
}
int main() {
static constexpr uint32_t n_threads{2U}; // Number of worker threads
//static constexpr uint32_t n_threads{1U}; // < Works fine
static constexpr uint32_t max_size{16U}; // Elements in the queue
std::atomic<uint32_t> done_count{0}; // Number of finished threads
Queue<int> queue1(max_size), queue2(max_size);
// Launch n_threads threads, make sure the main thread and the two worker
// threads are on different CPUs.
std::vector<std::thread> threads;
for (uint32_t i = 0; i < n_threads; i++) {
threads.emplace_back(queue_stress_test_main, std::ref(done_count),
std::ref(queue1), std::ref(queue2));
set_thread_affinity(threads.back().native_handle(), 0);
}
set_thread_affinity(pthread_self(), 1);
//set_thread_affinity(pthread_self(), 0); // < Works fine
// Pump data from queue2 into queue1
uint32_t elems_written = 0;
while (done_count < n_threads || !queue2.empty()) {
// Initially fill queue1 with all values from 0..max_size-1
if (elems_written < max_size) {
queue1.push(elems_written++);
}
// Read elements from queue2 and put them into queue1
auto res = queue2.pop();
if (res) {
queue1.push(res.value);
}
}
// Wait for all threads to finish
for (uint32_t i = 0; i < n_threads; i++) {
threads[i].join();
}
}
大多数时候这个程序会触发队列代码中的陷阱,这意味着 pop()
会尝试读取 push()
从未接触过的内存——尽管 pop()
应该 只有在 push()
被调用的频率至少与 pop()
.
一样多的情况下才会成功
您可以使用
在Linux上使用GCC/clang编译并运行上述程序
c++ -std=c++11 queue.cpp -o queue -lpthread && ./queue
要么只是连接上面的两个代码块,要么下载完整的程序here。
请注意,在谈到无锁数据结构时,我完全是个新手。我非常清楚有大量经过实战检验的 C++ 无锁队列实现。但是,我就是想不通为什么上面的代码不能按预期工作。
您有两个错误,其中之一可能导致您观察到的失败。
让我们看看您的推送代码,除了我们将只允许每个语句执行一个操作:
void push(T t)
{
auto const claimed_index = m_wr_ptr++; /* 1 */
auto const claimed_offset = claimed_index & m_mask; /* 2 */
auto& claimed_data = m_data[claimed_offset]; /* 3 */
claimed_data.store(t); /* 4 */
m_size++; /* 5 */
}
现在,对于具有两个生产者的队列,在操作 1 和 4 之间存在 window 竞争条件漏洞:
之前:
m_rd_ptr == 1
m_wr_ptr == 1
m_size == 0
制片人A:
/* 1 */ claimed_index = 1; m_wr_ptr = 2;
/* 2 */ claimed_offset = 1;
- 调度器让生产者A在这里休眠
制片人乙:
/* 1 */ claimed_index = 2; m_wr_ptr = 3;
/* 2 */ claimed_offset = 2;
/* 3 */ claimed_data = m_data[2];
/* 4 */ claimed_data.store(t);
/* 5 */ m_size = 1;
之后:
m_size == 1
m_rd_ptr == 1
m_wr_ptr == 3
m_data[1] == 0xDEADBEAF
m_data[2] == value_produced_by_B
消费者现在运行,看到 m_size > 0
,并从 m_data[1]
读取,同时将 m_rd_ptr
从 1 增加到 2。但是 m_data[1]
还没有被生产者写入A 还,生产者 B 写信给 m_data[2]
.
第二个错误是 pop()
中的补充情况,当消费者线程在 m_rd_ptr++
操作和 .load()
调用之间中断时。它可能导致读取值乱序,可能乱序到队列已完全循环并覆盖原始值。
仅仅因为单个源语句中的两个操作是原子的并不能使整个语句成为原子。
我正在尝试实现一个使用线性循环缓冲区来存储数据的无锁队列。与通用无锁队列相比,我有以下放宽条件:
- 我知道将存储在队列中的最坏情况下的元素数量。 队列是对一组固定元素进行操作的系统的一部分。该代码永远不会尝试在队列中存储更多元素,因为此固定集合中有元素。
- 否 multi-producer/multi-consumer. 队列将用于 multi-producer/single-consumer 或 single-producer/multi-consumer设置。
概念上,队列实现如下
- 标准二次幂环形缓冲区。底层数据结构是使用power-of-two trick的标准环形缓冲区。读写索引只会递增。当使用简单的位掩码对数组进行索引时,它们被限制在底层数组的大小。读指针在
pop()
中自动递增,写指针在push()
. 中自动递增
- 大小变量门访问
pop()
。 另一个 "size" 变量跟踪队列中元素的数量。这消除了对读取和写入索引执行算术的需要。大小变量在整个写操作发生后自动递增,即数据已写入后备存储 并且 写游标已递增。我正在使用 compare-and-swap (CAS) 操作以原子方式减少pop()
中的大小,并且仅在大小不为零时才继续。这样pop()
应该可以保证return有效数据。
我的队列实现如下。请注意,只要 pop()
尝试读取之前由 push()
写入的内存,调试代码就会停止执行。这永远不应该发生,因为 ‒ 至少在概念上是这样的 ‒ pop()
可能只有在队列中有元素时才会继续(不应该有下溢)。
#include <atomic>
#include <cstdint>
#include <csignal> // XXX for debugging
template <typename T>
class Queue {
private:
uint32_t m_data_size; // Number of elements allocated
std::atomic<T> *m_data; // Queue data, size is power of two
uint32_t m_mask; // Bitwise AND mask for m_rd_ptr and m_wr_ptr
std::atomic<uint32_t> m_rd_ptr; // Circular buffer read pointer
std::atomic<uint32_t> m_wr_ptr; // Circular buffer write pointer
std::atomic<uint32_t> m_size; // Number of elements in the queue
static uint32_t upper_power_of_two(uint32_t v) {
v--; // https://graphics.stanford.edu/~seander/bithacks.html
v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
v++;
return v;
}
public:
struct Optional { // Minimal replacement for std::optional
bool good;
T value;
Optional() : good(false) {}
Optional(T value) : good(true), value(std::move(value)) {}
explicit operator bool() const { return good; }
};
Queue(uint32_t max_size)
: // XXX Allocate 1 MiB of additional memory for debugging purposes
m_data_size(upper_power_of_two(1024 * 1024 + max_size)),
m_data(new std::atomic<T>[m_data_size]),
m_mask(m_data_size - 1),
m_rd_ptr(0),
m_wr_ptr(0),
m_size(0) {
// XXX Debug code begin
// Fill the memory with a marker so we can detect invalid reads
for (uint32_t i = 0; i < m_data_size; i++) {
m_data[i] = 0xDEADBEAF;
}
// XXX Debug code end
}
~Queue() { delete[] m_data; }
Optional pop() {
// Atomically decrement the size variable
uint32_t size = m_size.load();
while (size != 0 && !m_size.compare_exchange_weak(size, size - 1)) {
}
// The queue is empty, abort
if (size <= 0) {
return Optional();
}
// Read the actual element, atomically increase the read pointer
T res = m_data[(m_rd_ptr++) & m_mask].load();
// XXX Debug code begin
if (res == T(0xDEADBEAF)) {
std::raise(SIGTRAP);
}
// XXX Debug code end
return res;
}
void push(T t) {
m_data[(m_wr_ptr++) & m_mask].store(t);
m_size++;
}
bool empty() const { return m_size == 0; }
};
但是,确实会发生下溢,并且很容易在多线程压力测试中触发。在这个特定的测试中,我维护了两个队列 q1
和 q2
。在主线程中,我将固定数量的元素输入 q1
。两个工作线程从 q1
读取并在紧密循环中推送到 q2
。主线程从q2
读取数据,反馈给q1
.
如果只有一个工作线程 (single-producer/single-consumer) 或只要所有工作线程与主线程位于同一 CPU 上,这就可以正常工作。但是,一旦有两个工作线程被显式调度到与主线程不同的 CPU 上,它就会失败。
下面的代码实现了这个测试
#include <pthread.h>
#include <thread>
#include <vector>
static void queue_stress_test_main(std::atomic<uint32_t> &done_count,
Queue<int> &queue_rd, Queue<int> &queue_wr) {
for (size_t i = 0; i < (1UL << 24); i++) {
auto res = queue_rd.pop();
if (res) {
queue_wr.push(res.value);
}
}
done_count++;
}
static void set_thread_affinity(pthread_t thread, int cpu) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
if (pthread_setaffinity_np(thread, sizeof(cpu_set_t),
&cpuset) != 0) {
throw "Error while calling pthread_setaffinity_np";
}
}
int main() {
static constexpr uint32_t n_threads{2U}; // Number of worker threads
//static constexpr uint32_t n_threads{1U}; // < Works fine
static constexpr uint32_t max_size{16U}; // Elements in the queue
std::atomic<uint32_t> done_count{0}; // Number of finished threads
Queue<int> queue1(max_size), queue2(max_size);
// Launch n_threads threads, make sure the main thread and the two worker
// threads are on different CPUs.
std::vector<std::thread> threads;
for (uint32_t i = 0; i < n_threads; i++) {
threads.emplace_back(queue_stress_test_main, std::ref(done_count),
std::ref(queue1), std::ref(queue2));
set_thread_affinity(threads.back().native_handle(), 0);
}
set_thread_affinity(pthread_self(), 1);
//set_thread_affinity(pthread_self(), 0); // < Works fine
// Pump data from queue2 into queue1
uint32_t elems_written = 0;
while (done_count < n_threads || !queue2.empty()) {
// Initially fill queue1 with all values from 0..max_size-1
if (elems_written < max_size) {
queue1.push(elems_written++);
}
// Read elements from queue2 and put them into queue1
auto res = queue2.pop();
if (res) {
queue1.push(res.value);
}
}
// Wait for all threads to finish
for (uint32_t i = 0; i < n_threads; i++) {
threads[i].join();
}
}
大多数时候这个程序会触发队列代码中的陷阱,这意味着 pop()
会尝试读取 push()
从未接触过的内存——尽管 pop()
应该 只有在 push()
被调用的频率至少与 pop()
.
您可以使用
在Linux上使用GCC/clang编译并运行上述程序c++ -std=c++11 queue.cpp -o queue -lpthread && ./queue
要么只是连接上面的两个代码块,要么下载完整的程序here。
请注意,在谈到无锁数据结构时,我完全是个新手。我非常清楚有大量经过实战检验的 C++ 无锁队列实现。但是,我就是想不通为什么上面的代码不能按预期工作。
您有两个错误,其中之一可能导致您观察到的失败。
让我们看看您的推送代码,除了我们将只允许每个语句执行一个操作:
void push(T t)
{
auto const claimed_index = m_wr_ptr++; /* 1 */
auto const claimed_offset = claimed_index & m_mask; /* 2 */
auto& claimed_data = m_data[claimed_offset]; /* 3 */
claimed_data.store(t); /* 4 */
m_size++; /* 5 */
}
现在,对于具有两个生产者的队列,在操作 1 和 4 之间存在 window 竞争条件漏洞:
之前:
m_rd_ptr == 1
m_wr_ptr == 1
m_size == 0
制片人A:
/* 1 */ claimed_index = 1; m_wr_ptr = 2;
/* 2 */ claimed_offset = 1;
- 调度器让生产者A在这里休眠
制片人乙:
/* 1 */ claimed_index = 2; m_wr_ptr = 3;
/* 2 */ claimed_offset = 2;
/* 3 */ claimed_data = m_data[2];
/* 4 */ claimed_data.store(t);
/* 5 */ m_size = 1;
之后:
m_size == 1
m_rd_ptr == 1
m_wr_ptr == 3
m_data[1] == 0xDEADBEAF
m_data[2] == value_produced_by_B
消费者现在运行,看到 m_size > 0
,并从 m_data[1]
读取,同时将 m_rd_ptr
从 1 增加到 2。但是 m_data[1]
还没有被生产者写入A 还,生产者 B 写信给 m_data[2]
.
第二个错误是 pop()
中的补充情况,当消费者线程在 m_rd_ptr++
操作和 .load()
调用之间中断时。它可能导致读取值乱序,可能乱序到队列已完全循环并覆盖原始值。
仅仅因为单个源语句中的两个操作是原子的并不能使整个语句成为原子。