ARM 上的无锁 SPSC 队列实现
Lock-free SPSC queue implementation on ARM
我正在尝试为 ARM 编写一个单一的生产者单一消费者队列,我想我已经接近了解 DMB,但需要一些检查(我更熟悉 std::atomic。 )
这是我所在的位置:
bool push(const_reference value)
{
// Check for room
const size_type currentTail = tail;
const size_type nextTail = increment(currentTail);
if (nextTail == head)
return false;
// Write the value
valueArr[currentTail] = value;
// Prevent the consumer from seeing the incremented tail before the
// value is written.
__DMB();
// Increment tail
tail = nextTail;
return true;
}
bool pop(reference valueLocation)
{
// Check for data
const size_type currentHead = head;
if (currentHead == tail)
return false;
// Write the value.
valueLocation = valueArr[currentHead];
// Prevent the producer from seeing the incremented head before the
// value is written.
__DMB();
// Increment the head
head = increment(head);
return true;
}
我的问题是:我的 DMB 位置和理由是否准确?还是仍然了解我失踪了?我特别不确定在处理由其他线程(或中断)更新的变量时条件是否需要一些保护。
- 屏障是必要但不充分的,您还需要 "acquire" 语义来加载其他线程修改的 var。 (或者至少
consume
,但是在没有障碍的情况下实现它需要 asm 创建数据依赖性。编译器在已经具有控制依赖性后不会这样做。)
- 单核系统可以只使用像 GNU C
asm("":::"memory")
或 std::atomic_signal_fence(std::memory_order_release)
这样的编译器屏障,而不是 dmb
。制作一个宏,以便您可以在 SMP 安全屏障或 UP(单处理器)屏障之间进行选择。
head = increment(head);
是 head
的无意义重载,使用本地副本。
- 使用
std::atomic
可移植地获取必要的代码生成。
您通常不需要推出自己的原子; ARM 的现代编译器确实实现了 std::atomic<T>
。但是 AFAIK,没有 std::atomic<>
实现意识到单核系统以避免实际障碍并且只是安全的。可能导致上下文切换的中断。
在单核系统上,你不需要dsb
,只是一个编译器障碍。 CPU 将保留 asm 指令按程序顺序执行的错觉。您只需要确保编译器生成按正确顺序执行操作的 asm。您可以通过使用 std::atomic
和 std::memory_order_relaxed
以及手动 atomic_signal_fence(memory_order_acquire)
或 release
障碍来做到这一点。 (不是 atomic_thread_fence
;那会发出 asm 指令,通常是 dsb
)。
每个线程读取另一个线程修改的变量。通过确保修改仅在访问数组后可见,您正确地进行了修改发布存储。
但这些读取还需要 acquire-loads 才能与那些发布存储 同步。例如。确保 push
在 pop
完成读取同一元素之前没有写入 valueArr[currentTail] = value;
。或者在完整写入之前阅读条目。
没有任何障碍,失败模式将是 if (currentHead == tail) return false;
直到之后才真正从内存中检查 tail
的值
valueLocation = valueArr[currentHead];
发生了。运行时加载重新排序可以在弱排序的 ARM 上轻松实现。如果加载地址对 tail
具有数据依赖性,则可以避免在 SMP 系统上需要屏障(ARM 保证 asm 中的依赖性排序;mo_consume
应该公开的功能)。但是如果编译器只是发出一个分支,那只是一个控制依赖,而不是数据。如果您在 asm 中手工编写,我认为比较设置的标志上的 ldrne r0, [r1, r2]
之类的谓词负载会创建一个 data 依赖项。
编译时重新排序不太可能,但如果它只是阻止编译器执行它无论如何都不会执行的操作,则编译器专用障碍是免费的。
未经测试的实现,编译成看起来不错但没有其他测试的 asm
为 push
做类似的事情。我包含了用于加载获取/存储释放的包装函数,以及 fullbarrier()。 (相当于 Linux 内核的 smp_mb()
宏,定义为编译时或编译+运行时屏障。)
#include <atomic>
#define UNIPROCESSOR
#ifdef UNIPROCESSOR
#define fullbarrier() asm("":::"memory") // GNU C compiler barrier
// atomic_signal_fence(std::memory_order_seq_cst)
#else
#define fullbarrier() __DMB() // or atomic_thread_fence(std::memory_order_seq_cst)
#endif
template <class T>
T load_acquire(std::atomic<T> &x) {
#ifdef UNIPROCESSOR
T tmp = x.load(std::memory_order_relaxed);
std::atomic_signal_fence(std::memory_order_acquire);
// or fullbarrier(); if you want to use that macro
return tmp;
#else
return x.load(std::memory_order_acquire);
// fullbarrier() / __DMB();
#endif
}
template <class T>
void store_release(std::atomic<T> &x, T val) {
#ifdef UNIPROCESSOR
std::atomic_signal_fence(std::memory_order_release);
// or fullbarrier();
x.store(val, std::memory_order_relaxed);
#else
// fullbarrier() / __DMB(); before plain store
return x.store(val, std::memory_order_release);
#endif
}
template <class T>
struct SPSC_queue {
using size_type = unsigned;
using value_type = T;
static const size_type size = 1024;
std::atomic<size_type> head;
value_type valueArr[size];
std::atomic<size_type> tail; // in a separate cache-line from head to reduce contention
bool push(const value_type &value)
{
// Check for room
const size_type currentTail = tail.load(std::memory_order_relaxed); // no other writers to tail, no ordering needed
const size_type nextTail = currentTail + 1; // modulo separately so empty and full are distinguishable.
if (nextTail == load_acquire(head))
return false;
valueArr[currentTail % size] = value;
store_release(tail, nextTail);
return true;
}
};
// instantiate the template for int so we can look at the asm
template bool SPSC_queue<int>::push(const value_type &value);
如果您使用 -DUNIPROCESSOR
和 g++9.2 -O3 -mcpu=cortex-a15
,则 on the Godbolt compiler explorer 零障碍编译干净(只是为了选择一个随机的现代 ARM 核心,以便 GCC 可以内联 std::atomic
load/store 非单处理器情况下的功能和障碍。
我正在尝试为 ARM 编写一个单一的生产者单一消费者队列,我想我已经接近了解 DMB,但需要一些检查(我更熟悉 std::atomic。 )
这是我所在的位置:
bool push(const_reference value)
{
// Check for room
const size_type currentTail = tail;
const size_type nextTail = increment(currentTail);
if (nextTail == head)
return false;
// Write the value
valueArr[currentTail] = value;
// Prevent the consumer from seeing the incremented tail before the
// value is written.
__DMB();
// Increment tail
tail = nextTail;
return true;
}
bool pop(reference valueLocation)
{
// Check for data
const size_type currentHead = head;
if (currentHead == tail)
return false;
// Write the value.
valueLocation = valueArr[currentHead];
// Prevent the producer from seeing the incremented head before the
// value is written.
__DMB();
// Increment the head
head = increment(head);
return true;
}
我的问题是:我的 DMB 位置和理由是否准确?还是仍然了解我失踪了?我特别不确定在处理由其他线程(或中断)更新的变量时条件是否需要一些保护。
- 屏障是必要但不充分的,您还需要 "acquire" 语义来加载其他线程修改的 var。 (或者至少
consume
,但是在没有障碍的情况下实现它需要 asm 创建数据依赖性。编译器在已经具有控制依赖性后不会这样做。) - 单核系统可以只使用像 GNU C
asm("":::"memory")
或std::atomic_signal_fence(std::memory_order_release)
这样的编译器屏障,而不是dmb
。制作一个宏,以便您可以在 SMP 安全屏障或 UP(单处理器)屏障之间进行选择。 head = increment(head);
是head
的无意义重载,使用本地副本。- 使用
std::atomic
可移植地获取必要的代码生成。
您通常不需要推出自己的原子; ARM 的现代编译器确实实现了 std::atomic<T>
。但是 AFAIK,没有 std::atomic<>
实现意识到单核系统以避免实际障碍并且只是安全的。可能导致上下文切换的中断。
在单核系统上,你不需要dsb
,只是一个编译器障碍。 CPU 将保留 asm 指令按程序顺序执行的错觉。您只需要确保编译器生成按正确顺序执行操作的 asm。您可以通过使用 std::atomic
和 std::memory_order_relaxed
以及手动 atomic_signal_fence(memory_order_acquire)
或 release
障碍来做到这一点。 (不是 atomic_thread_fence
;那会发出 asm 指令,通常是 dsb
)。
每个线程读取另一个线程修改的变量。通过确保修改仅在访问数组后可见,您正确地进行了修改发布存储。
但这些读取还需要 acquire-loads 才能与那些发布存储 同步。例如。确保 push
在 pop
完成读取同一元素之前没有写入 valueArr[currentTail] = value;
。或者在完整写入之前阅读条目。
没有任何障碍,失败模式将是 if (currentHead == tail) return false;
直到之后才真正从内存中检查 tail
的值
valueLocation = valueArr[currentHead];
发生了。运行时加载重新排序可以在弱排序的 ARM 上轻松实现。如果加载地址对 tail
具有数据依赖性,则可以避免在 SMP 系统上需要屏障(ARM 保证 asm 中的依赖性排序;mo_consume
应该公开的功能)。但是如果编译器只是发出一个分支,那只是一个控制依赖,而不是数据。如果您在 asm 中手工编写,我认为比较设置的标志上的 ldrne r0, [r1, r2]
之类的谓词负载会创建一个 data 依赖项。
编译时重新排序不太可能,但如果它只是阻止编译器执行它无论如何都不会执行的操作,则编译器专用障碍是免费的。
未经测试的实现,编译成看起来不错但没有其他测试的 asm
为 push
做类似的事情。我包含了用于加载获取/存储释放的包装函数,以及 fullbarrier()。 (相当于 Linux 内核的 smp_mb()
宏,定义为编译时或编译+运行时屏障。)
#include <atomic>
#define UNIPROCESSOR
#ifdef UNIPROCESSOR
#define fullbarrier() asm("":::"memory") // GNU C compiler barrier
// atomic_signal_fence(std::memory_order_seq_cst)
#else
#define fullbarrier() __DMB() // or atomic_thread_fence(std::memory_order_seq_cst)
#endif
template <class T>
T load_acquire(std::atomic<T> &x) {
#ifdef UNIPROCESSOR
T tmp = x.load(std::memory_order_relaxed);
std::atomic_signal_fence(std::memory_order_acquire);
// or fullbarrier(); if you want to use that macro
return tmp;
#else
return x.load(std::memory_order_acquire);
// fullbarrier() / __DMB();
#endif
}
template <class T>
void store_release(std::atomic<T> &x, T val) {
#ifdef UNIPROCESSOR
std::atomic_signal_fence(std::memory_order_release);
// or fullbarrier();
x.store(val, std::memory_order_relaxed);
#else
// fullbarrier() / __DMB(); before plain store
return x.store(val, std::memory_order_release);
#endif
}
template <class T>
struct SPSC_queue {
using size_type = unsigned;
using value_type = T;
static const size_type size = 1024;
std::atomic<size_type> head;
value_type valueArr[size];
std::atomic<size_type> tail; // in a separate cache-line from head to reduce contention
bool push(const value_type &value)
{
// Check for room
const size_type currentTail = tail.load(std::memory_order_relaxed); // no other writers to tail, no ordering needed
const size_type nextTail = currentTail + 1; // modulo separately so empty and full are distinguishable.
if (nextTail == load_acquire(head))
return false;
valueArr[currentTail % size] = value;
store_release(tail, nextTail);
return true;
}
};
// instantiate the template for int so we can look at the asm
template bool SPSC_queue<int>::push(const value_type &value);
如果您使用 -DUNIPROCESSOR
和 g++9.2 -O3 -mcpu=cortex-a15
,则 on the Godbolt compiler explorer 零障碍编译干净(只是为了选择一个随机的现代 ARM 核心,以便 GCC 可以内联 std::atomic
load/store 非单处理器情况下的功能和障碍。