用于成员变量无锁更新的环形分配器?
Ring Allocator For Lockfree Update of Member Variable?
我有一个 class 存储一些传入实时数据的最新值(大约 1.5 亿 events/second)。
假设它看起来像这样:
class DataState
{
Event latest_event;
public:
//pushes event atomically
void push_event(const Event __restrict__* e);
//pulls event atomically
Event pull_event();
};
我需要能够以原子方式推送事件并以严格的顺序保证拉取它们。现在,我知道我可以使用自旋锁,但鉴于大量事件发生率(超过 100 million/second)和高度并发性,我更愿意使用无锁操作。
问题是 Event
的大小是 64 字节。任何当前 X86 CPU(截至 2016 年 8 月)都没有 CMPXCHG64B
指令。因此,如果我使用 std::atomic<Event>
,我必须从 link 到 libatomic
,后者在后台使用互斥体(太慢)。
所以我的解决方案是原子地交换指向该值的指针。问题是动态内存分配成为这些事件发生率的瓶颈。所以...我定义了一个我称之为 "ring allocator":
的东西
/// @brief Lockfree Static short-lived allocator used for a ringbuffer
/// Elements are guaranteed to persist only for "size" calls to get_next()
template<typename T> class RingAllocator {
T *arena;
std::atomic_size_t arena_idx;
const std::size_t arena_size;
public:
/// @brief Creates a new RingAllocator
/// @param size The number of elements in the underlying arena. Make this large enough to avoid overwriting fresh data
RingAllocator<T>(std::size_t size) : arena_size(size)
{
//allocate pool
arena = new T[size];
//zero out pool
std::memset(arena, 0, sizeof(T) * size);
arena_idx = 0;
}
~RingAllocator()
{
delete[] arena;
}
/// @brief Return next element's pointer. Thread-safe
/// @return pointer to next available element
T *get_next()
{
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
}
};
然后我可以让我的 DataState class 看起来像这样:
class DataState
{
std::atomic<Event*> latest_event;
RingAllocator<Event> event_allocator;
public:
//pushes event atomically
void push_event(const Event __restrict__* e)
{
//store event
Event *new_ptr = event_allocator.get_next()
*new_ptr = *e;
//swap event pointers
latest_event.store(new_ptr, std::memory_order_release);
}
//pulls event atomically
Event pull_event()
{
return *(latest_event.load(std::memory_order_acquire));
}
};
只要我将环分配器的大小设置为可以并发调用函数的线程的最大数量,就不会有 pull_event 可以 return 覆盖数据的风险。此外,一切都超级本地化,因此间接不会导致缓存性能不佳。这种方法有什么可能的陷阱吗?
DataState
class:
我以为它会是一个堆栈或队列,但它不是,所以 push
/ pull
看起来不是很好的方法名称。 (否则实施完全是伪造的)。
它只是一个闩锁,可让您读取任何线程存储的最后一个事件。
没有什么可以阻止连续两次写入覆盖从未被读取的元素。也没有什么可以阻止你阅读同一个元素两次。
如果您只需要在某处复制小块数据,环形缓冲区似乎是一种不错的方法。但是如果你不想失去事件,我认为你不能这样使用它。相反,只需获取一个环形缓冲区条目,然后复制到它并在那里使用它。所以唯一的原子操作应该是递增环形缓冲区位置索引。
环形缓冲区
您可以使 get_next()
更有效率。此行执行原子 post-增量 (fetch_add) 和原子交换:
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
我什至不确定它是否安全,因为 xchg 可能会从另一个线程踩到 fetch_add。反正就算安全了,也不理想。
你不需要那个。确保 arena_size 始终是 2 的幂,那么您就不需要对共享计数器取模。你可以放手,让每个线程取模供自己使用。它最终会换行,但它是一个二进制整数,所以它会以 2 的幂换行,这是你的竞技场大小的倍数。
我建议存储一个 AND 掩码而不是一个大小,这样就没有 %
编译成 and
指令以外的任何东西的风险,即使它不是一个编译-时间常数。这确保我们避免使用 64 位整数 div
指令。
template<typename T> class RingAllocator {
T *arena;
std::atomic_size_t arena_idx;
const std::size_t size_mask; // maybe even make this a template parameter?
public:
RingAllocator<T>(std::size_t size)
: arena_idx(0), size_mask(size-1)
{
// verify that size is actually a power of two, so the mask is all-ones in the low bits, and all-zeros in the high bits.
// so that i % size == i & size_mask for all i
...
}
...
T *get_next() {
size_t idx = arena_idx.fetch_add(1, std::memory_order_relaxed); // still atomic, but we don't care which order different threads take blocks in
idx &= size_mask; // modulo our local copy of the idx
return &arena[idx];
}
};
如果使用 calloc
而不是 new + memset,分配 arena 会更有效率。 OS 在将它们提供给 user-space 进程之前已经将页面归零(以防止信息泄漏),因此将它们全部写入只是浪费工作。
arena = new T[size];
std::memset(arena, 0, sizeof(T) * size);
// vs.
arena = (T*)calloc(size, sizeof(T));
自己写页面确实会出错,所以它们都连接到真实的物理页面,而不是系统范围共享物理零页面的写时复制映射(就像它们在 new/malloc/calloc).在 NUMA 系统上,选择的物理页面可能取决于哪个线程实际接触了该页面,而不是哪个线程进行了分配。但是由于您正在重用池,第一个写入页面的核心可能不是最终使用它最多的核心。
也许可以在微基准测试/性能计数器中寻找一些东西。
您是否看过任何可用的 C++ Disruptor (Java) 端口?
虽然它们不是完整的端口,但它们可以提供您需要的一切。我目前正在开发功能更齐全的端口,但还没有完全准备好。
As long as I size my ring allocator to the max # of threads that may concurrently call the functions, there's no risk of overwriting data that pull_event could return. .... Any possible pitfalls with this approach?
陷阱是,IIUC,你的说法是错误的。
如果我只有 2 个线程,环形缓冲区中有 10 个元素,第一个线程可以调用 pull_event 一次,然后是 "mid-pulling",然后第二个线程可以调用 push 10 次,覆盖线程 1 正在拉动的内容。
同样,假设我正确理解您的代码。
另外,如上所述,
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
那个arena_idx++
里面的交换就同一个变量,只是看起来不对。而事实上是错误的。两个线程可以递增它 - ThreadA 递增到 8,threadB 递增到 9,然后 threadB 将其交换为 9,然后 threadA 将其交换为 8。哎呀。
原子(op1)@原子(op2)!=原子(op1@op2)
我担心未显示的代码还有什么问题。我并不是说这是一种侮辱 - 无锁并不容易。
我有一个 class 存储一些传入实时数据的最新值(大约 1.5 亿 events/second)。
假设它看起来像这样:
class DataState
{
Event latest_event;
public:
//pushes event atomically
void push_event(const Event __restrict__* e);
//pulls event atomically
Event pull_event();
};
我需要能够以原子方式推送事件并以严格的顺序保证拉取它们。现在,我知道我可以使用自旋锁,但鉴于大量事件发生率(超过 100 million/second)和高度并发性,我更愿意使用无锁操作。
问题是 Event
的大小是 64 字节。任何当前 X86 CPU(截至 2016 年 8 月)都没有 CMPXCHG64B
指令。因此,如果我使用 std::atomic<Event>
,我必须从 link 到 libatomic
,后者在后台使用互斥体(太慢)。
所以我的解决方案是原子地交换指向该值的指针。问题是动态内存分配成为这些事件发生率的瓶颈。所以...我定义了一个我称之为 "ring allocator":
的东西/// @brief Lockfree Static short-lived allocator used for a ringbuffer
/// Elements are guaranteed to persist only for "size" calls to get_next()
template<typename T> class RingAllocator {
T *arena;
std::atomic_size_t arena_idx;
const std::size_t arena_size;
public:
/// @brief Creates a new RingAllocator
/// @param size The number of elements in the underlying arena. Make this large enough to avoid overwriting fresh data
RingAllocator<T>(std::size_t size) : arena_size(size)
{
//allocate pool
arena = new T[size];
//zero out pool
std::memset(arena, 0, sizeof(T) * size);
arena_idx = 0;
}
~RingAllocator()
{
delete[] arena;
}
/// @brief Return next element's pointer. Thread-safe
/// @return pointer to next available element
T *get_next()
{
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
}
};
然后我可以让我的 DataState class 看起来像这样:
class DataState
{
std::atomic<Event*> latest_event;
RingAllocator<Event> event_allocator;
public:
//pushes event atomically
void push_event(const Event __restrict__* e)
{
//store event
Event *new_ptr = event_allocator.get_next()
*new_ptr = *e;
//swap event pointers
latest_event.store(new_ptr, std::memory_order_release);
}
//pulls event atomically
Event pull_event()
{
return *(latest_event.load(std::memory_order_acquire));
}
};
只要我将环分配器的大小设置为可以并发调用函数的线程的最大数量,就不会有 pull_event 可以 return 覆盖数据的风险。此外,一切都超级本地化,因此间接不会导致缓存性能不佳。这种方法有什么可能的陷阱吗?
DataState
class:
我以为它会是一个堆栈或队列,但它不是,所以 push
/ pull
看起来不是很好的方法名称。 (否则实施完全是伪造的)。
它只是一个闩锁,可让您读取任何线程存储的最后一个事件。
没有什么可以阻止连续两次写入覆盖从未被读取的元素。也没有什么可以阻止你阅读同一个元素两次。
如果您只需要在某处复制小块数据,环形缓冲区似乎是一种不错的方法。但是如果你不想失去事件,我认为你不能这样使用它。相反,只需获取一个环形缓冲区条目,然后复制到它并在那里使用它。所以唯一的原子操作应该是递增环形缓冲区位置索引。
环形缓冲区
您可以使 get_next()
更有效率。此行执行原子 post-增量 (fetch_add) 和原子交换:
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
我什至不确定它是否安全,因为 xchg 可能会从另一个线程踩到 fetch_add。反正就算安全了,也不理想。
你不需要那个。确保 arena_size 始终是 2 的幂,那么您就不需要对共享计数器取模。你可以放手,让每个线程取模供自己使用。它最终会换行,但它是一个二进制整数,所以它会以 2 的幂换行,这是你的竞技场大小的倍数。
我建议存储一个 AND 掩码而不是一个大小,这样就没有 %
编译成 and
指令以外的任何东西的风险,即使它不是一个编译-时间常数。这确保我们避免使用 64 位整数 div
指令。
template<typename T> class RingAllocator {
T *arena;
std::atomic_size_t arena_idx;
const std::size_t size_mask; // maybe even make this a template parameter?
public:
RingAllocator<T>(std::size_t size)
: arena_idx(0), size_mask(size-1)
{
// verify that size is actually a power of two, so the mask is all-ones in the low bits, and all-zeros in the high bits.
// so that i % size == i & size_mask for all i
...
}
...
T *get_next() {
size_t idx = arena_idx.fetch_add(1, std::memory_order_relaxed); // still atomic, but we don't care which order different threads take blocks in
idx &= size_mask; // modulo our local copy of the idx
return &arena[idx];
}
};
如果使用 calloc
而不是 new + memset,分配 arena 会更有效率。 OS 在将它们提供给 user-space 进程之前已经将页面归零(以防止信息泄漏),因此将它们全部写入只是浪费工作。
arena = new T[size];
std::memset(arena, 0, sizeof(T) * size);
// vs.
arena = (T*)calloc(size, sizeof(T));
自己写页面确实会出错,所以它们都连接到真实的物理页面,而不是系统范围共享物理零页面的写时复制映射(就像它们在 new/malloc/calloc).在 NUMA 系统上,选择的物理页面可能取决于哪个线程实际接触了该页面,而不是哪个线程进行了分配。但是由于您正在重用池,第一个写入页面的核心可能不是最终使用它最多的核心。
也许可以在微基准测试/性能计数器中寻找一些东西。
您是否看过任何可用的 C++ Disruptor (Java) 端口?
虽然它们不是完整的端口,但它们可以提供您需要的一切。我目前正在开发功能更齐全的端口,但还没有完全准备好。
As long as I size my ring allocator to the max # of threads that may concurrently call the functions, there's no risk of overwriting data that pull_event could return. .... Any possible pitfalls with this approach?
陷阱是,IIUC,你的说法是错误的。
如果我只有 2 个线程,环形缓冲区中有 10 个元素,第一个线程可以调用 pull_event 一次,然后是 "mid-pulling",然后第二个线程可以调用 push 10 次,覆盖线程 1 正在拉动的内容。
同样,假设我正确理解您的代码。
另外,如上所述,
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
那个arena_idx++
里面的交换就同一个变量,只是看起来不对。而事实上是错误的。两个线程可以递增它 - ThreadA 递增到 8,threadB 递增到 9,然后 threadB 将其交换为 9,然后 threadA 将其交换为 8。哎呀。
原子(op1)@原子(op2)!=原子(op1@op2)
我担心未显示的代码还有什么问题。我并不是说这是一种侮辱 - 无锁并不容易。