为什么这个无锁队列有效?
Why does this lock free queue work?
我正在重写一个旧的无锁队列实现,我开始对所有内容使用 memory_order_relaxed 以收紧内存语义并在以后添加独立的栅栏等。但奇怪的是,它正在工作..我已经尝试使用最大优化设置使用 XCode 和 VS2015 进行编译。大约 1-1.5 年前,我的代码与此失败非常相似,这是我最后一次写这个。
这是我的队列:
#ifndef __LOCKFREEMPMCQUEUE_H__
#define __LOCKFREEMPMCQUEUE_H__
#include <atomic>
template <typename T>
class LockFreeMPMCQueue
{
public:
explicit LockFreeMPMCQueue(size_t size)
: m_data(new T[size])
, m_size(size)
, m_head_1(0)
, m_head_2(0)
, m_tail_1(0)
, m_tail_2(0)
{
}
virtual ~LockFreeMPMCQueue() { delete m_data; }
bool try_enqueue(const T& value)
{
size_t tail = m_tail_1.load(std::memory_order_relaxed);
const size_t head = m_head_2.load(std::memory_order_relaxed);
const size_t count = tail - head;
if (count == m_size)
{
return false;
}
if (std::atomic_compare_exchange_weak_explicit(&m_tail_1, &tail, (tail + 1), std::memory_order_relaxed,
std::memory_order_relaxed) == false)
{
return false;
}
m_data[tail % m_size] = value;
while (m_tail_2.load(std::memory_order_relaxed) != tail)
{
std::this_thread::yield();
}
m_tail_2.store(tail + 1, std::memory_order_relaxed);
return true;
}
bool try_dequeue(T& out)
{
size_t head = m_head_1.load(std::memory_order_relaxed);
const size_t tail = m_tail_2.load(std::memory_order_relaxed);
if (head == tail)
{
return false;
}
if (std::atomic_compare_exchange_weak_explicit(&m_head_1, &head, (head + 1), std::memory_order_relaxed,
std::memory_order_relaxed) == false)
{
return false;
}
out = m_data[head % m_size];
while (m_head_2.load(std::memory_order_relaxed) != head)
{
std::this_thread::yield();
}
m_head_2.store(head + 1, std::memory_order_relaxed);
return true;
}
size_t capacity() const { return m_size; }
private:
T* m_data;
size_t m_size;
std::atomic<size_t> m_head_1;
std::atomic<size_t> m_head_2;
std::atomic<size_t> m_tail_1;
std::atomic<size_t> m_tail_2;
};
#endif
这是我写的测试:
#include <chrono>
#include <thread>
#include <vector>
#include "LockFreeMPMCQueue.h"
std::chrono::microseconds::rep test(LockFreeMPMCQueue<size_t>& queue, char* memory, const size_t num_threads, const size_t num_values)
{
memset(memory, 0, sizeof(char) * num_values);
const size_t num_values_per_thread = num_values / num_threads;
std::thread* reader_threads = new std::thread[num_threads];
std::thread* writer_threads = new std::thread[num_threads];
auto start = std::chrono::high_resolution_clock::now();
for (size_t i = 0; i < num_threads; ++i)
{
reader_threads[i] = std::thread([i, &queue, memory, num_values_per_thread]()
{
for (size_t x = 0; x < num_values_per_thread; ++x)
{
size_t value;
while (!queue.try_dequeue(value))
{
}
memory[value] = 1;
}
});
}
for (size_t i = 0; i < num_threads; ++i)
{
writer_threads[i] = std::thread([i, &queue, num_values_per_thread]()
{
const size_t offset = i * num_values_per_thread;
for (size_t x = 0; x < num_values_per_thread; ++x)
{
const size_t value = offset + x;
while (!queue.try_enqueue(value))
{
}
}
});
}
for (size_t i = 0; i < num_threads; ++i)
{
reader_threads[i].join();
writer_threads[i].join();
}
auto time_taken = std::chrono::high_resolution_clock::now() - start;
delete[] reader_threads;
delete[] writer_threads;
bool fail = false;
for (size_t i = 0; i < num_values; ++i)
{
if (memory[i] == 0)
{
printf("%u = 0\n", i);
fail = true;
}
}
if (fail)
{
printf("FAIL!\n");
}
return std::chrono::duration_cast<std::chrono::milliseconds>(time_taken).count();
}
int main(int argc, char* argv[])
{
const size_t num_threads_max = 16;
const size_t num_values = 1 << 12;
const size_t queue_size = 128;
const size_t num_samples = 128;
LockFreeMPMCQueue<size_t> queue( queue_size );
char* memory = new char[num_values];
const double inv_num_samples = 1.0 / double( num_samples );
for( size_t num_threads = 1; num_threads <= num_threads_max; num_threads *= 2 )
{
double avg_time_taken = 0.0;
for( size_t i = 0; i < num_samples; ++i )
{
avg_time_taken += test( queue, memory, num_threads, num_values ) * inv_num_samples;
}
printf("%u threads, %u ms\n", num_threads, avg_time_taken);
}
delete[] memory;
char c;
scanf("%c", &c);
return 0;
}
非常感谢任何帮助!
内存顺序仅指定您从生成的代码中请求的最低保证。编译器和硬件可以随意提供更强的保证。
请特别注意,在 x86 平台上,许多内存访问始终由硬件同步(例如,x86 上的加载始终顺序一致)。这就是为什么在 x86 上运行良好的代码在移植到 ARM 或 PowerPC 时经常中断,而不考虑这些平台上较弱的默认同步。
Herb Sutter 在他的 Atomic Weapons talk from C++ and Beyond 2012 (starts at about 31 minutes into the video; or look for the slides 标题为 代码生成 中有一个很好的比较 table,从第 34 页开始),他在其中展示了不同的内存排序可能会或可能不会导致为不同平台生成不同的代码。
底线:仅仅因为您的代码现在在您的机器上似乎可以正常工作,并不意味着它是正确的。这就是为什么你不应该乱用内存顺序的一个主要原因,除非你确切地知道你在做什么(即使那样你可能仍然不应该这样做)。
我正在重写一个旧的无锁队列实现,我开始对所有内容使用 memory_order_relaxed 以收紧内存语义并在以后添加独立的栅栏等。但奇怪的是,它正在工作..我已经尝试使用最大优化设置使用 XCode 和 VS2015 进行编译。大约 1-1.5 年前,我的代码与此失败非常相似,这是我最后一次写这个。
这是我的队列:
#ifndef __LOCKFREEMPMCQUEUE_H__
#define __LOCKFREEMPMCQUEUE_H__
#include <atomic>
template <typename T>
class LockFreeMPMCQueue
{
public:
explicit LockFreeMPMCQueue(size_t size)
: m_data(new T[size])
, m_size(size)
, m_head_1(0)
, m_head_2(0)
, m_tail_1(0)
, m_tail_2(0)
{
}
virtual ~LockFreeMPMCQueue() { delete m_data; }
bool try_enqueue(const T& value)
{
size_t tail = m_tail_1.load(std::memory_order_relaxed);
const size_t head = m_head_2.load(std::memory_order_relaxed);
const size_t count = tail - head;
if (count == m_size)
{
return false;
}
if (std::atomic_compare_exchange_weak_explicit(&m_tail_1, &tail, (tail + 1), std::memory_order_relaxed,
std::memory_order_relaxed) == false)
{
return false;
}
m_data[tail % m_size] = value;
while (m_tail_2.load(std::memory_order_relaxed) != tail)
{
std::this_thread::yield();
}
m_tail_2.store(tail + 1, std::memory_order_relaxed);
return true;
}
bool try_dequeue(T& out)
{
size_t head = m_head_1.load(std::memory_order_relaxed);
const size_t tail = m_tail_2.load(std::memory_order_relaxed);
if (head == tail)
{
return false;
}
if (std::atomic_compare_exchange_weak_explicit(&m_head_1, &head, (head + 1), std::memory_order_relaxed,
std::memory_order_relaxed) == false)
{
return false;
}
out = m_data[head % m_size];
while (m_head_2.load(std::memory_order_relaxed) != head)
{
std::this_thread::yield();
}
m_head_2.store(head + 1, std::memory_order_relaxed);
return true;
}
size_t capacity() const { return m_size; }
private:
T* m_data;
size_t m_size;
std::atomic<size_t> m_head_1;
std::atomic<size_t> m_head_2;
std::atomic<size_t> m_tail_1;
std::atomic<size_t> m_tail_2;
};
#endif
这是我写的测试:
#include <chrono>
#include <thread>
#include <vector>
#include "LockFreeMPMCQueue.h"
std::chrono::microseconds::rep test(LockFreeMPMCQueue<size_t>& queue, char* memory, const size_t num_threads, const size_t num_values)
{
memset(memory, 0, sizeof(char) * num_values);
const size_t num_values_per_thread = num_values / num_threads;
std::thread* reader_threads = new std::thread[num_threads];
std::thread* writer_threads = new std::thread[num_threads];
auto start = std::chrono::high_resolution_clock::now();
for (size_t i = 0; i < num_threads; ++i)
{
reader_threads[i] = std::thread([i, &queue, memory, num_values_per_thread]()
{
for (size_t x = 0; x < num_values_per_thread; ++x)
{
size_t value;
while (!queue.try_dequeue(value))
{
}
memory[value] = 1;
}
});
}
for (size_t i = 0; i < num_threads; ++i)
{
writer_threads[i] = std::thread([i, &queue, num_values_per_thread]()
{
const size_t offset = i * num_values_per_thread;
for (size_t x = 0; x < num_values_per_thread; ++x)
{
const size_t value = offset + x;
while (!queue.try_enqueue(value))
{
}
}
});
}
for (size_t i = 0; i < num_threads; ++i)
{
reader_threads[i].join();
writer_threads[i].join();
}
auto time_taken = std::chrono::high_resolution_clock::now() - start;
delete[] reader_threads;
delete[] writer_threads;
bool fail = false;
for (size_t i = 0; i < num_values; ++i)
{
if (memory[i] == 0)
{
printf("%u = 0\n", i);
fail = true;
}
}
if (fail)
{
printf("FAIL!\n");
}
return std::chrono::duration_cast<std::chrono::milliseconds>(time_taken).count();
}
int main(int argc, char* argv[])
{
const size_t num_threads_max = 16;
const size_t num_values = 1 << 12;
const size_t queue_size = 128;
const size_t num_samples = 128;
LockFreeMPMCQueue<size_t> queue( queue_size );
char* memory = new char[num_values];
const double inv_num_samples = 1.0 / double( num_samples );
for( size_t num_threads = 1; num_threads <= num_threads_max; num_threads *= 2 )
{
double avg_time_taken = 0.0;
for( size_t i = 0; i < num_samples; ++i )
{
avg_time_taken += test( queue, memory, num_threads, num_values ) * inv_num_samples;
}
printf("%u threads, %u ms\n", num_threads, avg_time_taken);
}
delete[] memory;
char c;
scanf("%c", &c);
return 0;
}
非常感谢任何帮助!
内存顺序仅指定您从生成的代码中请求的最低保证。编译器和硬件可以随意提供更强的保证。
请特别注意,在 x86 平台上,许多内存访问始终由硬件同步(例如,x86 上的加载始终顺序一致)。这就是为什么在 x86 上运行良好的代码在移植到 ARM 或 PowerPC 时经常中断,而不考虑这些平台上较弱的默认同步。
Herb Sutter 在他的 Atomic Weapons talk from C++ and Beyond 2012 (starts at about 31 minutes into the video; or look for the slides 标题为 代码生成 中有一个很好的比较 table,从第 34 页开始),他在其中展示了不同的内存排序可能会或可能不会导致为不同平台生成不同的代码。
底线:仅仅因为您的代码现在在您的机器上似乎可以正常工作,并不意味着它是正确的。这就是为什么你不应该乱用内存顺序的一个主要原因,除非你确切地知道你在做什么(即使那样你可能仍然不应该这样做)。