使用原子的无锁单生产者多消费者数据结构

lock free single producer multiple consumer data struct using atomic

我最近有如下示例代码(实际代码要复杂得多)。看了 Hans Boehm 的 cppcon16 talk on atomic 后,有点担心我的代码能不能用。

produce被单个生产者线程调用,consume被多个消费者线程调用。 Producer 只更新序号为 2, 4, 6, 8,... 的数据,但在更新数据之前设置为奇数序号,如 1, 3, 5, 7, ... 以指示数据可能是脏的。消费者也尝试以相同的顺序(2、4、6、...)获取数据。

消费者在读取后仔细检查序列号以确保数据良好(在读取期间生产者未更新)。

我认为我的代码在 x86_64(我的目标平台)上运行良好,因为 x86_64 不会将商店与其他商店重新排序,也不会加载商店或负载,但我怀疑它在其他平台上是错误的平台。

我是否正确认为数据分配(在生产中)可以移动到 'store(n-1)' 之上,以便消费者读取损坏的数据但 t == t2 仍然成功?

struct S 
{
    atomic<int64_t> seq;
    // data members of primitive type int, double etc    
    ...
};

S s;

void produce(int64_t n, ...) // ... for above data members
{
    s.seq.store(n-1, std::memory_order_release); // indicates it's working on data members

    // assign data members of s
    ...

    s.seq.store(n, std::memory_order_release); // complete updating
}

bool consume(int64_t n, ...) // ... for interested fields passed as reference
{
    auto t = s.load(std::memory_order_acquire);

    if (t == n)
    {
        // read fields
        ...

        auto t2 = s.load(std::memory_order_acquire);
        if (t == t2)
            return true;
    }        

    return false;
}

Compile-time reordering 在以 x86 为目标时仍然会咬你,因为编译器优化以保留程序在 C++ 抽象机上的行为,而不是任何更强的依赖于体系结构的行为。由于我们要避免 memory_order_seq_cst,因此允许重新排序。

是的,您的商店可以按照您的建议重新订购。您的负载也可以使用 t2 负载重新排序,因为 an acquire-load is only a one-way barrier. It would be legal for a compiler to optimize away the t2 check entirely. If a reordering is possible, the compiler is allowed to decide that it's what always happens and apply the as-if rule to make more efficient code. (Current compilers usually don't, but this is definitely allowed by the current standard as written. See 。)

防止重新排序的选项是:

  • 使所有数据成员stores/loads 具有释放和获取语义的原子性。 (最后一个数据成员的获取加载将阻止 t2 加载首先完成。)
  • 使用barriers (aka fences)将所有非原子存储和非原子加载作为一个组进行排序。

    正如 Jeff Preshing 解释的那样,a mo_release fence isn't the same thing as a mo_release store 是我们需要的那种双向屏障。 std::atomic 只是回收 std::mo_ 名称,而不是为围栏指定不同的名称。

    (顺便说一句,非原子的 stores/loads 应该是 mo_relaxed 的原子,因为从技术上讲,在它们可能处于被重写的过程中读取它们在技术上是未定义的行为,即使如果您决定不看您阅读的内容。)


void produce(int64_t n, ...) // ... for above data members
{
    /*********** changed lines ************/
    std::atomic_signal_fence(std::memory_order_release);  // compiler-barrier to make sure the compiler does the seq store as late as possible (to give the reader more time with it valid).
    s.seq.store(n-1, std::memory_order_relaxed);          // changed from release
    std::atomic_thread_fence(std::memory_order_release);  // StoreStore barrier prevents reordering of the above store with any below stores.  (It's also a LoadStore barrier)
    /*********** end of changes ***********/

    // assign data members of s
    ...

    // release semantics prevent any preceding stores from being delayed past here
    s.seq.store(n, std::memory_order_release); // complete updating
}



bool consume(int64_t n, ...) // ... for interested fields passed as reference
{
    if (n == s.seq.load(std::memory_order_acquire))
    {
        // acquire semantics prevent any reordering with following loads

        // read fields
        ...

    /*********** changed lines ************/
        std::atomic_thread_fence(std::memory_order_acquire);  // LoadLoad barrier (and LoadStore)
        auto t2 = s.seq.load(std::memory_order_relaxed);    // relaxed: it's ordered by the fence and doesn't need anything extra
        // std::atomic_signal_fence(std::memory_order_acquire);  // compiler barrier: probably not useful on the load side.
    /*********** end of changes ***********/
        if (n == t2)
            return true;
    }

    return false;
}

注意额外的编译器障碍(signal_fence 仅影响编译时重新排序)以确保编译器不会将一次迭代中的第二个存储与下一次迭代中的第一个存储合并,如果这样是 运行 在一个循环中。或者更一般地说,确保使区域无效的存储尽可能晚地完成,以减少误报。 (对于真正的编译器可能没有必要,并且在调用此函数之间有大量代码。但是 signal_fence 永远不会编译成任何指令,并且似乎比将第一个存储保持为 mo_release 更好的选择。在在释放存储和线程栅栏都编译为额外指令的架构中,宽松的存储避免了两个单独的屏障指令。)

我还担心第一个商店可能会与上一次迭代的发布商店重新排序。但我认为这永远不会发生,因为两家商店都在同一个地址。 (在编译时,也许标准允许恶意编译器执行此操作,但任何理智的编译器如果认为一个可以通过另一个,就会根本不执行其中一个存储。)在 运行 时间在弱序架构上,我不确定同一地址的存储是否会在全局范围内变得无序可见。 这在现实生活中应该不是问题,因为生产者可能不会被连续调用。


顺便说一句,您使用的同步技术是 Seqlock,但只有一个写入器。您只有序列部分,没有用于同步单独编写器的锁定部分。在多作者版本中,作者会在 reading/writing 序列号和数据之前获取锁。 (而不是将 seq no 作为函数 arg,您将从锁中读取它)。

C++ 标准讨论论文 N4455 (about compiler-optimizations of atomics, see the second half of my answer on ) 以它为例。

他们对写入器中的数据项使用发布存储,而不是 StoreStore 栅栏。 (对于原子数据项,正如我所提到的,这确实是正确的)。

void writer(T d1, T d2) {
  unsigned seq0 = seq.load(std::memory_order_relaxed);  // note that they read the current value because it's presumably a multiple-writers implementation.
  seq.store(seq0 + 1, std::memory_order_relaxed);
  data1.store(d1, std::memory_order_release);
  data2.store(d2, std::memory_order_release);
  seq.store(seq0 + 2, std::memory_order_release);
}

他们谈论让 reader 的序列号的第二次加载可能会在以后的操作中重新排序,如果编译器这样做有利可图,并在 [=] 中使用 t2 = seq.fetch_add(0, std::memory_order_release) 76=] 作为获取释放语义负载的潜在方式。对于当前的编译器,我推荐;您可能会在 x86 上进行 locked 操作,而我上面建议的方式没有任何(或任何实际的屏障指令,因为只有全屏障 seq_cst 栅栏需要 x86 上的指令).