使用 std::atomic 的 C++ 线程安全增量,带模而不带互斥锁

C++ thread-safe increment with modulo without mutex using std::atomic

我需要一个以循环方式使用的线程安全缓冲区对象池。我通常会在其中放置一个互斥体以使增量和模线程安全,但是是否可以使用 std::atomic 来编写它?这是一个示例界面。如果它使事情变得更容易,缓冲区的总数可以是 2 的幂。下一个缓冲区索引永远不会在 class.

之外访问
class Buffer;

class BufferManager
{
public:

    BufferManager( size_t totalBuffers = 8 ) : mNextBufferIndex( 0 ), mTotalBuffers( totalBuffers )
    {
        mBuffers = new Buffer*[mTotalBuffers];
    }

    Buffer* GetNextBuffer()
    {
        // How to make this operation atomic?
        size_t index = mNextBufferIndex; 

        mNextBufferIndex = ( mNextBufferIndex + 1 ) % mTotalBuffers;

        return mBuffers[index];
    }

private:
    Buffer**            mBuffers;
    size_t              mNextBufferIndex;
    size_t              mTotalBuffers;
};

当你有两个输入时,这是不可能的。你可以用原子做的就是使用 CPU 支持的原子指令(我还没有听说过可以做递增加模运算的芯片),或者你可以先做计算然后设置值如果输入没有改变 - 但您的函数确实有两个输入,所以这也不起作用。

您可以只声明 mNextBufferIndex 一个 std::atomic_ullong 然后使用

return mBuffers[(mNextBufferIndex++) % mTotalBuffers];

增量将是原子的,您在返回之前计算模数。

使用非常大的无符号数将避免计数器回绕时出现的问题。

取模后可以放心使用

std::atomic<size_t> mNextBufferIndex;

    Buffer* GetNextBuffer()
    {
       // How to make this operation atomic?
       size_t index = mNextBufferIndex ++; 

       size_t id = index % mTotalBuffers;

       // If size could wrap, then re-write the modulo value.
       // oldValue keeps getting re-read.  
       // modulo occurs when nothing else updates it.
       size_t oldValue =mNextBufferIndex;
       size_t newValue = oldValue % mTotalBuffers;
       while (!m_mNextBufferIndex.compare_exchange_weak( oldValue, newValue, std::memory_order_relaxed ) )
            newValue = oldValue % mTotalBuffers; 
       return mBuffers[id ];
    }

据我所知,有硬件辅助联锁操作。一种这样的操作是增量。你不需要让它变得更复杂,因为模运算可以独立于增量运算。

std::atomic 确实使 operator++ 过载,我认为它具有原子保证。

Buffer* GetNextBuffer()
{
   // Once this inc operation has run the return value
   // is unique and local to this thread the modulo operation
   // does not factor into the consistency model 
   auto n = mNextBufferIndex++; 
   auto pool_index = n % mTotalBuffers;
   return mBuffers[pool_index];
}

如果你想用模数或任何其他复杂的算法来做到这一点,你可以使用比较和交换版本。

比较和交换或比较和交换之间的想法是,你进行计算,当你想将值写回内存位置(共享或其他)时,只有在没有其他人未修改的情况下,它才会成功同时的值(如果他们这样做,您只需重试该操作,忙等待)。这只需要一个可预测的编号方案,这通常是很有可能做到的。

Buffer* GetNextBuffer()
{
   // Let's assume that we wanted to do this
   auto n = (mNextBufferIndex % mTotalBuffers) ++; 
   mNextBufferIndex = n;
   return mBuffers[n];
}

假设 mNextBufferIndex 是一个 std::atomic

Buffer* GetNextBuffer()
{
   // Let's assume that we wanted to do this
   auto n = (mNextBufferIndex % mTotalBuffers)++;
   // This will now either succeed or not in the presence of concurrency
   while (!std::compare_exchange_weak(mNextBufferIndex, n)) {
     n = (mNextBufferIndex % mTotalBuffers)++;
   }
   return mBuffers[n];
}

您可能认为这更类似于乐观并发控制,但如果您将自己局限于对什么是原子的定义太狭窄,您将无法完成任何事情。

撇开我在这里计算的内容完全是胡说八道,它展示了 compare_exchange 操作的强大程度以及如何使用它来使任何算术原子化。真正的问题是当你有多个相互依赖的计算时。在这种情况下,您需要编写大量恢复例程。

不过,互锁操作本身并不是空闲的,并且会逐出处理器中的缓存行。

作为参考,我可以推荐 Mike Acton 的 paper slides 关于 增量问题