C++ 中的保序 memcpy

Order-preserving memcpy in C++

我正在开发一个多核、多线程软件库,我想在其中提供更新顺序保留 可能跨越多个高速缓存行的无锁共享内存对象。

具体来说,假设我有一些缓存行大小对象的向量 X:X[0],... X[K] 每个都恰好占据一个缓存行。我按索引顺序写给他们:首先是 X[0],然后是 X[1],等等。如果线程 2 读取 X[K],它是否也会看到 X[0] 的状态 "at least as current" 作为它为 X[K] 看到了什么?

从同一个线程,显然我会看到尊重更新顺序的内存语义。但是现在,如果第二个线程读取 X[K],问题就出现了:是否会观察到对 X[0]...X[K-1] 的相应更新?

有了锁,我们确实得到了这个保证。但是使用 memcpy 将一些东西复制到向量中,我们失去了这个 属性:memcpy 有一个 POSIX 语义,它不能保证索引顺序更新或内存顺序更新或任何其他排序。你只是保证在 memcpy 完成后,整个更新已经执行。

我的问题:是否已经有一个具有类似速度但具有所需保证的顺序保留 memcpy?如果没有,是否可以在不锁定的情况下实现这样的原语?

假设我的目标平台是 x86 和 ARM。

(编者注:原先说的是Intel,所以OP可能不关心AMD。)

您描述的排序要求正是 release/acquire 语义所提供的。 (http://preshing.com/20120913/acquire-and-release-semantics/).

问题是高效保证原子loads/stores的原子性单位在所有 x86 和某些 ARM 上最多为 8 个字节。否则在其他 ARM 上只有 4 个字节。 ()。某些 Intel CPUs 可能在实践中有原子 32 甚至 64 字节 (AVX512) 存储,但 Intel 和 AMD 都没有做出任何官方保证。

我们甚至不知道 SIMD 向量存储在可能将宽对齐存储分解为多个 8 字节对齐块时是否有保证顺序。或者即使这些块是单独原子的。 Per-element atomicity of vector load/store and gather/scatter?有充分的理由相信它们是每个元素原子的,即使文档没有保证它。

如果大 "objects" 对性能至关重要,您可以考虑在您关心的特定服务器上测试向量 load/store 原子性,但就保证和让编译器使用它。 (有内在函数。)确保在不同套接字上的内核之间进行测试,以捕获由于 K10 Opteron 上套接字之间的 HyperTransport 而在 8 字节边界撕裂 SSE instructions: which CPUs can do atomic 16B memory operations? 的情况。这可能是一个非常糟糕的主意;您无法猜测在极少数情况下是否有任何微体系结构条件可以使宽向量存储成为非原子的,即使它通常看起来像原子的。


您可以轻松地对数组元素进行 release/acquire 排序,例如
alignas(64) atomic<uint64_t> arr[1024];.
你只需要很好地询问编译器:

copy_to_atomic(std::atomic<uint64_t> *__restrict dst_a, 
                      const uint64_t *__restrict src, size_t len) {
    const uint64_t *endsrc = src+len;
    while (src < src+len) {
        dst_a->store( *src, std::memory_order_release );
        dst_a++; src++;
    }
}

在 x86-64 上,它不会自动向量化或任何东西,因为编译器不会优化原子,并且因为没有文档表明使用向量存储原子元素数组的连续元素是安全的。 :( 所以这基本上很糟糕。看到它 on the Godbolt compiler explorer

我会考虑使用 volatile __m256i* 指针(对齐 load/store)和像 atomic_thread_fence(std::memory_order_release) 这样的编译器屏障来滚动你自己的指针,以防止编译时重新排序。每个元素 ordering/atomicity 应该没问题(但同样不能保证)。并且绝对不要指望整个 32 字节是原子的,只是更高的 uint64_t 元素写在较低的 uint64_t 元素之后(并且这些存储按该顺序对其他核心可见)。


在 ARM32 上:即使是 uint64_t 的原子存储也不是很好。 gcc 使用 ldrexd / strexd 对 (LL/SC),因为显然没有 8 字节原子纯存储。 (我用 gcc7.2 -O3 -march=armv7-a 编译。在 AArch32 模式下使用 armv8-a,store-pair 是原子的。AArch64 当然也有原子 8 字节 load/store。)


您必须避免使用普通的 C 库 memcpy 实现。 在 x86 上,它可以对大型副本使用弱排序存储,允许在其自己的存储之间重新排序(但不适用于不属于 memcpy 的后来的商店,因为这可能会破坏后来的发行商店。)

movnt 在矢量循环中绕过缓存存储,或在具有 ERMSB 功能的 CPU 上 rep movsb,都可以产生这种效果。 .

或者 memcpy 实现可以简单地选择先执行最后一个(部分)向量,然后再进入其主循环。

C 和 C++ 中 UB 中非 atomic 类型的并发写+读或写+写;这就是为什么 memcpy 有如此大的自由来做任何它想做的事情,包括使用弱排序存储,只要它在必要时使用 sfence 以确保 memcpy 作为一个整体尊重排序编译器期望它为以后的 mo_release 操作发出代码。

(即 x86 的当前 C++ 实现 std::atomic 假设没有让他们担心的弱排序存储。任何希望他们的 NT 存储尊重编译器生成的顺序的代码atomic<T> 代码必须使用 _mm_sfence()。或者,如果手动编写 asm,则直接使用 sfence 指令。或者,如果您想进行顺序释放存储,则只需使用 xchg 并给出您的 asm 函数也具有 atomic_thread_fence(mo_seq_cst) 的效果。)

我发现 这个问题很有见地、很详细,而且很有帮助。但是我没有看到他的建议写入代码,因此对于需要快速解决 DMA 或无锁算法的有序写入这个问题的后代和未来的人们,我包括了我根据该答案编写的代码。我在 x64 和 armv7-a 上使用 gcc 4.9 构建它,虽然我只 运行 它并在 x64 上测试它。

#include <atomic>
#include <stdlib.h>
#include <algorithm> // min

extern "C" {

static void * linear_memcpy_portable(void *__restrict dest, const void *__restrict src, size_t n)
{
   // Align dest if not already aligned
   if ((uintptr_t)dest & sizeof(uint64_t)) {
      uint8_t *__restrict dst8 = reinterpret_cast<uint8_t *__restrict>(dest);
      const uint8_t *__restrict src8 = reinterpret_cast<const uint8_t *__restrict>(src);
      const size_t align_n = std::min(n, (uintptr_t)dest & sizeof(uint64_t));
      const uint8_t * const endsrc8 = static_cast<const uint8_t * const>(src) + align_n;
      while (src8 < endsrc8) {
         *dst8 = *src8;
         atomic_thread_fence(std::memory_order_release);
         dst8++; src8++;
      }
      dest = dst8;
      src = src8;
      n = n - align_n;
   }
   typedef uint64_t __attribute__((may_alias,aligned(1))) aliasing_unaligned_uint64_t;
   uint64_t *__restrict dst64 = static_cast<uint64_t *__restrict>(dest);
   const aliasing_unaligned_uint64_t *__restrict src64 = static_cast<const aliasing_unaligned_uint64_t *__restrict>(src);
   const uint64_t * const endsrc64 = src64 + n / sizeof(uint64_t);
   const uint8_t * const endsrc8 = static_cast<const uint8_t * const>(src) + n;
   while (src64 < endsrc64) {
      *dst64 = *src64;
      atomic_thread_fence(std::memory_order_release);
      dst64++; src64++;
   }
   if (reinterpret_cast<const uint8_t * const>(endsrc64) != endsrc8) {
      uint8_t *__restrict dst8 = reinterpret_cast<uint8_t *__restrict>(dst64);
      const uint8_t *__restrict src8 = reinterpret_cast<const uint8_t *__restrict>(src64);
      while (src8 < endsrc8) {
         *dst8 = *src8;
         atomic_thread_fence(std::memory_order_release);
         dst8++; src8++;
      }
   }
   return dest;
}

#if (_M_AMD64 || __x86_64__)
#include <immintrin.h>
static void * linear_memcpy_avx2(void *dest, const void * src, size_t n) __attribute__((target("avx2")));
static void * linear_memcpy_avx2(void *dest, const void * src, size_t n)
{
   __m256i *__restrict dst256 = static_cast<__m256i *__restrict>(dest);
   const __m256i *__restrict src256 = static_cast<const __m256i *__restrict>(src);
   const __m256i * const endsrc256 = src256 + n / sizeof(__m256i);
   const uint8_t * const endsrc8 = static_cast<const uint8_t *>(src) + n;
   while (src256 < endsrc256) {
      _mm256_storeu_si256(dst256, _mm256_loadu_si256(src256));
      atomic_thread_fence(std::memory_order_release);
      dst256++; src256++;
   }
   if (reinterpret_cast<const uint8_t * const>(endsrc256) != endsrc8)
      linear_memcpy_portable(dst256, src256, endsrc8 - reinterpret_cast<const uint8_t * const>(endsrc256));
   return dest;
}

static void * linear_memcpy_sse2(void *dest, const void * src, size_t n) __attribute__((target("sse2")));
static void * linear_memcpy_sse2(void *dest, const void * src, size_t n)
{
   __m128i *__restrict dst128 = static_cast<__m128i *__restrict>(dest);
   const __m128i *__restrict src128 = static_cast<const __m128i *__restrict>(src);
   const __m128i * const endsrc128 = src128 + n / sizeof(__m128i);
   const uint8_t * const endsrc8 = static_cast<const uint8_t *>(src) + n;
   while (src128 < endsrc128) {
      _mm_storeu_si128(dst128, _mm_loadu_si128(src128));
      atomic_thread_fence(std::memory_order_release);
      dst128++; src128++;
   }
   if (reinterpret_cast<const uint8_t * const>(endsrc128) != endsrc8)
      linear_memcpy_portable(dst128, src128, endsrc8 - reinterpret_cast<const uint8_t * const>(endsrc128));
   return dest;
}

static void *(*resolve_linear_memcpy(void))(void *, const void *, size_t)
{
   __builtin_cpu_init();
   // All x64 targets support a minimum of SSE2
   return __builtin_cpu_supports("avx2") ? linear_memcpy_avx2 : linear_memcpy_sse2;
}
#ifdef __AVX2__
// IF AVX2 is specified to the compiler, alias to the avx2 impl so it can be inlined
void * linear_memcpy(void *, const void *, size_t) __attribute__((alias("linear_memcpy_avx2")));
#else
void * linear_memcpy(void *, const void *, size_t) __attribute__((ifunc("resolve_linear_memcpy")));
#endif
#else
void * linear_memcpy(void *, const void *, size_t) __attribute__((alias("linear_memcpy_portable")));
#endif

} // extern "C"

我欢迎任何有关实施的反馈。 :)