使用 std::atomic 的 C++ 线程安全增量,带模而不带互斥锁
C++ thread-safe increment with modulo without mutex using std::atomic
我需要一个以循环方式使用的线程安全缓冲区对象池。我通常会在其中放置一个互斥体以使增量和模线程安全,但是是否可以使用 std::atomic 来编写它?这是一个示例界面。如果它使事情变得更容易,缓冲区的总数可以是 2 的幂。下一个缓冲区索引永远不会在 class.
之外访问
class Buffer;
class BufferManager
{
public:
BufferManager( size_t totalBuffers = 8 ) : mNextBufferIndex( 0 ), mTotalBuffers( totalBuffers )
{
mBuffers = new Buffer*[mTotalBuffers];
}
Buffer* GetNextBuffer()
{
// How to make this operation atomic?
size_t index = mNextBufferIndex;
mNextBufferIndex = ( mNextBufferIndex + 1 ) % mTotalBuffers;
return mBuffers[index];
}
private:
Buffer** mBuffers;
size_t mNextBufferIndex;
size_t mTotalBuffers;
};
当你有两个输入时,这是不可能的。你可以用原子做的就是使用 CPU 支持的原子指令(我还没有听说过可以做递增加模运算的芯片),或者你可以先做计算然后设置值如果输入没有改变 - 但您的函数确实有两个输入,所以这也不起作用。
您可以只声明 mNextBufferIndex
一个 std::atomic_ullong
然后使用
return mBuffers[(mNextBufferIndex++) % mTotalBuffers];
增量将是原子的,您在返回之前计算模数。
使用非常大的无符号数将避免计数器回绕时出现的问题。
取模后可以放心使用
std::atomic<size_t> mNextBufferIndex;
Buffer* GetNextBuffer()
{
// How to make this operation atomic?
size_t index = mNextBufferIndex ++;
size_t id = index % mTotalBuffers;
// If size could wrap, then re-write the modulo value.
// oldValue keeps getting re-read.
// modulo occurs when nothing else updates it.
size_t oldValue =mNextBufferIndex;
size_t newValue = oldValue % mTotalBuffers;
while (!m_mNextBufferIndex.compare_exchange_weak( oldValue, newValue, std::memory_order_relaxed ) )
newValue = oldValue % mTotalBuffers;
return mBuffers[id ];
}
据我所知,有硬件辅助联锁操作。一种这样的操作是增量。你不需要让它变得更复杂,因为模运算可以独立于增量运算。
std::atomic
确实使 operator++
过载,我认为它具有原子保证。
Buffer* GetNextBuffer()
{
// Once this inc operation has run the return value
// is unique and local to this thread the modulo operation
// does not factor into the consistency model
auto n = mNextBufferIndex++;
auto pool_index = n % mTotalBuffers;
return mBuffers[pool_index];
}
如果你想用模数或任何其他复杂的算法来做到这一点,你可以使用比较和交换版本。
比较和交换或比较和交换之间的想法是,你进行计算,当你想将值写回内存位置(共享或其他)时,只有在没有其他人未修改的情况下,它才会成功同时的值(如果他们这样做,您只需重试该操作,忙等待)。这只需要一个可预测的编号方案,这通常是很有可能做到的。
Buffer* GetNextBuffer()
{
// Let's assume that we wanted to do this
auto n = (mNextBufferIndex % mTotalBuffers) ++;
mNextBufferIndex = n;
return mBuffers[n];
}
假设 mNextBufferIndex
是一个 std::atomic
。
Buffer* GetNextBuffer()
{
// Let's assume that we wanted to do this
auto n = (mNextBufferIndex % mTotalBuffers)++;
// This will now either succeed or not in the presence of concurrency
while (!std::compare_exchange_weak(mNextBufferIndex, n)) {
n = (mNextBufferIndex % mTotalBuffers)++;
}
return mBuffers[n];
}
您可能认为这更类似于乐观并发控制,但如果您将自己局限于对什么是原子的定义太狭窄,您将无法完成任何事情。
撇开我在这里计算的内容完全是胡说八道,它展示了 compare_exchange
操作的强大程度以及如何使用它来使任何算术原子化。真正的问题是当你有多个相互依赖的计算时。在这种情况下,您需要编写大量恢复例程。
不过,互锁操作本身并不是空闲的,并且会逐出处理器中的缓存行。
作为参考,我可以推荐 Mike Acton 的 paper slides 关于 增量问题 。
我需要一个以循环方式使用的线程安全缓冲区对象池。我通常会在其中放置一个互斥体以使增量和模线程安全,但是是否可以使用 std::atomic 来编写它?这是一个示例界面。如果它使事情变得更容易,缓冲区的总数可以是 2 的幂。下一个缓冲区索引永远不会在 class.
之外访问class Buffer;
class BufferManager
{
public:
BufferManager( size_t totalBuffers = 8 ) : mNextBufferIndex( 0 ), mTotalBuffers( totalBuffers )
{
mBuffers = new Buffer*[mTotalBuffers];
}
Buffer* GetNextBuffer()
{
// How to make this operation atomic?
size_t index = mNextBufferIndex;
mNextBufferIndex = ( mNextBufferIndex + 1 ) % mTotalBuffers;
return mBuffers[index];
}
private:
Buffer** mBuffers;
size_t mNextBufferIndex;
size_t mTotalBuffers;
};
当你有两个输入时,这是不可能的。你可以用原子做的就是使用 CPU 支持的原子指令(我还没有听说过可以做递增加模运算的芯片),或者你可以先做计算然后设置值如果输入没有改变 - 但您的函数确实有两个输入,所以这也不起作用。
您可以只声明 mNextBufferIndex
一个 std::atomic_ullong
然后使用
return mBuffers[(mNextBufferIndex++) % mTotalBuffers];
增量将是原子的,您在返回之前计算模数。
使用非常大的无符号数将避免计数器回绕时出现的问题。
取模后可以放心使用
std::atomic<size_t> mNextBufferIndex;
Buffer* GetNextBuffer()
{
// How to make this operation atomic?
size_t index = mNextBufferIndex ++;
size_t id = index % mTotalBuffers;
// If size could wrap, then re-write the modulo value.
// oldValue keeps getting re-read.
// modulo occurs when nothing else updates it.
size_t oldValue =mNextBufferIndex;
size_t newValue = oldValue % mTotalBuffers;
while (!m_mNextBufferIndex.compare_exchange_weak( oldValue, newValue, std::memory_order_relaxed ) )
newValue = oldValue % mTotalBuffers;
return mBuffers[id ];
}
据我所知,有硬件辅助联锁操作。一种这样的操作是增量。你不需要让它变得更复杂,因为模运算可以独立于增量运算。
std::atomic
确实使 operator++
过载,我认为它具有原子保证。
Buffer* GetNextBuffer()
{
// Once this inc operation has run the return value
// is unique and local to this thread the modulo operation
// does not factor into the consistency model
auto n = mNextBufferIndex++;
auto pool_index = n % mTotalBuffers;
return mBuffers[pool_index];
}
如果你想用模数或任何其他复杂的算法来做到这一点,你可以使用比较和交换版本。
比较和交换或比较和交换之间的想法是,你进行计算,当你想将值写回内存位置(共享或其他)时,只有在没有其他人未修改的情况下,它才会成功同时的值(如果他们这样做,您只需重试该操作,忙等待)。这只需要一个可预测的编号方案,这通常是很有可能做到的。
Buffer* GetNextBuffer()
{
// Let's assume that we wanted to do this
auto n = (mNextBufferIndex % mTotalBuffers) ++;
mNextBufferIndex = n;
return mBuffers[n];
}
假设 mNextBufferIndex
是一个 std::atomic
。
Buffer* GetNextBuffer()
{
// Let's assume that we wanted to do this
auto n = (mNextBufferIndex % mTotalBuffers)++;
// This will now either succeed or not in the presence of concurrency
while (!std::compare_exchange_weak(mNextBufferIndex, n)) {
n = (mNextBufferIndex % mTotalBuffers)++;
}
return mBuffers[n];
}
您可能认为这更类似于乐观并发控制,但如果您将自己局限于对什么是原子的定义太狭窄,您将无法完成任何事情。
撇开我在这里计算的内容完全是胡说八道,它展示了 compare_exchange
操作的强大程度以及如何使用它来使任何算术原子化。真正的问题是当你有多个相互依赖的计算时。在这种情况下,您需要编写大量恢复例程。
不过,互锁操作本身并不是空闲的,并且会逐出处理器中的缓存行。
作为参考,我可以推荐 Mike Acton 的 paper slides 关于 增量问题 。