此发布/检查更新 class 对于单个作者 + reader 是否可以使用 memory_order_relaxed 或 acquire/release 来提高效率?
Could this publish / check-for-update class for a single writer + reader use memory_order_relaxed or acquire/release for efficiency?
简介
我有一个小的 class,它利用 std::atomic 进行无锁操作。由于这个 class 被大量调用,它影响了性能,我遇到了麻烦。
Class 描述
class类似于后进先出,但是一旦pop()函数被调用,它只会return它的ring-buffer最后写入的元素(只有自从有新元素最后弹出())。
一个线程正在调用 push(),另一个线程正在调用 pop()。
我读过的来源
由于这占用了我太多的电脑时间,我决定进一步研究 std::atomic class 及其 memory_order。我在 Whosebug 和其他资源和书籍中阅读了很多可用的 memory_order post,但我无法清楚地了解不同的模式。特别是,我在获取和释放模式之间挣扎:我也看不出为什么它们与 memory_order_seq_cst.
不同
我认为每个记忆顺序使用我自己的研究
memory_order_relaxed:在同一个线程中,原子操作是即时的,但其他线程可能无法立即看到最新的值,他们需要一些时间直到它们被更新。代码可以由编译器或 OS.
自由重新排序
memory_order_获取/发布: 由 atomic::load 使用。它防止在此之前的代码行被重新排序(compiler/OS 可能会在此行之后重新排序),并使用 [=101 读取存储在此原子上的最新值=]_release 或 memory_order_seq_cst 在这个线程或另一个线程中。 memory_order_release 还可以防止该代码在重新排序后出现。因此,在 acquire/release 中,两者之间的所有代码都可以被 OS 打乱。我不确定那是在同一线程之间,还是在不同线程之间。
memory_order_seq_cst:最简单易用,因为它就像我们使用变量的自然写法,立即刷新其他线程加载函数的值。
LockFreeEx class
template<typename T>
class LockFreeEx
{
public:
void push(const T& element)
{
const int wPos = m_position.load(std::memory_order_seq_cst);
const int nextPos = getNextPos(wPos);
m_buffer[nextPos] = element;
m_position.store(nextPos, std::memory_order_seq_cst);
}
const bool pop(T& returnedElement)
{
const int wPos = m_position.exchange(-1, std::memory_order_seq_cst);
if (wPos != -1)
{
returnedElement = m_buffer[wPos];
return true;
}
else
{
return false;
}
}
private:
static constexpr int maxElements = 8;
static constexpr int getNextPos(int pos) noexcept {return (++pos == maxElements)? 0 : pos;}
std::array<T, maxElements> m_buffer;
std::atomic<int> m_position {-1};
};
我希望如何改进它
所以,我的第一个想法是在所有原子操作中使用 memory_order_relaxed,因为 pop() 线程处于循环中,每 10-15 毫秒在 pop 函数中寻找可用的更新,然后它被允许最初的 pop() 函数失败,后来才意识到有新的更新。只是几毫秒而已。
另一种选择是使用 release/acquire - 但我不确定它们。在 all store() 中使用 release,在 all load() 函数中使用 acquire。
不幸的是,我描述的所有 memory_order 似乎都有效,而且我不确定它们何时会失败,如果它们应该会失败的话。
决赛
请问,如果您在此处使用宽松的内存顺序时发现问题,能否告诉我?或者我应该使用 release/acquire (也许对这些的进一步解释可以帮助我)?为什么?
我认为 relaxed 最适合这个 class,在它的所有 store() 或 load() 中。但我不确定!
感谢阅读。
编辑:额外说明:
看到大家都在问'char',所以改成int,问题解决!但这不是我要解决的问题。
正如我之前所说,class 可能是后进先出法,但只有最后一个元素被推入的地方才重要,如果有的话。
我有一个很大的结构 T(可复制和可分配),我必须以无锁方式在两个线程之间共享它。所以,我知道的唯一方法是使用一个循环缓冲区来写入 T 的最后一个已知值,以及一个知道最后一个写入值的索引的原子。如果没有,索引将为 -1。
请注意,我的推送线程必须知道何时有 "new T" 可用,这就是为什么 pop() return 是一个布尔值。
再次感谢所有试图帮助我处理记忆命令的人! :)
阅读解决方案后:
template<typename T>
class LockFreeEx
{
public:
LockFreeEx() {}
LockFreeEx(const T& initValue): m_data(initValue) {}
// WRITE THREAD - CAN BE SLOW, WILL BE CALLED EACH 500-800ms
void publish(const T& element)
{
// I used acquire instead relaxed to makesure wPos is always the lastest w_writePos value, and nextPos calculates the right one
const int wPos = m_writePos.load(std::memory_order_acquire);
const int nextPos = (wPos + 1) % bufferMaxSize;
m_buffer[nextPos] = element;
m_writePos.store(nextPos, std::memory_order_release);
}
// READ THREAD - NEED TO BE VERY FAST - CALLED ONCE AT THE BEGGINING OF THE LOOP each 2ms
inline void update()
{
// should I change to relaxed? It doesn't matter I don't get the new value or the old one, since I will call this function again very soon, and again, and again...
const int writeIndex = m_writePos.load(std::memory_order_acquire);
// Updating only in case there is something new... T may be a heavy struct
if (m_readPos != writeIndex)
{
m_readPos = writeIndex;
m_data = m_buffer[m_readPos];
}
}
// NEED TO BE LIGHTNING FAST, CALLED MULTIPLE TIMES IN THE READ THREAD
inline const T& get() const noexcept {return m_data;}
private:
// Buffer
static constexpr int bufferMaxSize = 4;
std::array<T, bufferMaxSize> m_buffer;
std::atomic<int> m_writePos {0};
int m_readPos = 0;
// Data
T m_data;
};
内存顺序与您何时看到对原子对象的某些特定更改无关,而是与此更改可以保证周围代码的内容有关。 Relaxed atomics 除了对原子对象本身的更改外,什么都不保证:更改将是原子的。但是您不能在任何同步上下文中使用松散原子。
并且您有一些代码需要同步。您想要弹出已推送的内容,而不是尝试弹出尚未推送的内容。因此,如果您使用宽松的操作,则无法保证您的 pop 会看到此推送代码:
m_buffer[nextPos] = element;
m_position.store(nextPos, std::memory_relaxed);
如上所写。也可以这样看:
m_position.store(nextPos, std::memory_relaxed);
m_buffer[nextPos] = element;
因此您可能会尝试从缓冲区中获取一个尚不存在的元素。因此,您必须使用一些同步并且至少使用 acquire/release 内存顺序。
以及您的实际代码。我觉得顺序可以如下:
const char wPos = m_position.load(std::memory_order_relaxed);
...
m_position.store(nextPos, std::memory_order_release);
...
const char wPos = m_position.exchange(-1, memory_order_acquire);
你的作者只需要release
,不需要seq-cst,但是relaxed
太弱了。在 m_buffer[]
条目的非原子赋值 之后 之前,您不能发布 m_position
的值。 您需要发布顺序以确保 m_position
存储仅对其他线程可见 在 所有较早的内存操作之后。 (包括非原子分配)。 https://preshing.com/20120913/acquire-and-release-semantics/
这必须 "synchronize-with" 获取或 seq_cst 加载到 reader。或者在 reader.
中至少 mo_consume
理论上你还需要wpos = m_position
至少是acquire
(或者consume
中的reader),不放松,因为C++11的内存模型对于值预测之类的事情来说足够弱,它可以让编译器在加载实际从连贯缓存中获取值之前推测性地使用 wPos
的值。
(在实际 CPUs 上的实践中,疯狂的编译器可以使用 test/branch 来引入控制依赖性,从而允许分支预测 + 推测执行来打破可能值的数据依赖性wPos
.)
但是普通的编译器不会那样做。在除 DEC Alpha 之外的 CPU 上,wPos = m_position
源代码中的数据依赖性然后使用 m_buffer[wPos]
将在 asm 中创建数据依赖性,如 mo_consume
应该从中获利。 Alpha 以外的真实 ISA 保证依赖加载的依赖顺序。 (甚至在 Alpha 上,使用宽松的原子交换可能足以关闭存在于少数允许重新排序的真实 Alpha CPU 上的微小 window。)
为 x86 编译时,使用 mo_acquire
没有任何缺点;它不需要任何额外的障碍。在其他 ISA 上可能存在,例如 32 位 ARM,其中 acquire
成本障碍,因此 "cheating" 具有宽松的负载可能是一个胜利,在实践中仍然是安全的。当前的编译器总是将 mo_consume
强化为 mo_acquire
,因此很遗憾我们无法利用它。
即使使用 seq_cst
,您也已经有了真实的竞争条件。
- 初始状态:
m_position = 0
- reader "claims" 插槽 0 通过在
m_position = -1
中交换并读取 部分 m_buffer[0];
- reader 出于某种原因休眠(例如定时器中断取消调度),或者只是与作家赛跑。
- 编写器将
wPos = m_position
读取为 -1
,并计算 nextPos = 0
。
覆盖部分读取的m_buffer[0]
- reader一觉醒来读完了,撕破了
T &element
。 C++抽象机中的数据竞赛UB,实践中的撕逼
在读取后添加对 m_position
的第二次检查(如 SeqLock)无法在所有情况下检测到这一点,因为编写器直到 后才更新 m_position
写入缓冲区元素。
即使您的实际用例在读取和写入之间存在很长的间隔,这个缺陷也会让您几乎同时发生一次读取和写入。
I for sure know that the read side cannot wait for nothing and cannot be stopped (it's audio) and it's poped each 5-10ms, and the write side is the user input, which is more slower, a faster one could do a push once each 500ms.
一毫秒在现代 CPU 上是 ages。线程间延迟通常约为 60 ns,因此不到一微秒,例如来自四核 Intel x86。只要你不睡在互斥锁上,在放弃之前旋转重试一两次就不是问题。
代码审查:
The class similar to a LIFO, but once the pop() function is called, it only return the last written element of its ring-buffer (only if there are new elements since last pop()).
这不是真正的队列或堆栈:push 和 pop 不是好名字。 "publish" 和 "read" 或 "get" 可能更好,并使它的用途更加明显。
我会在代码中加入注释来描述这样一个事实,即这对单个作者、多个 reader 是安全的。 (push
中 m_position
的非原子增量使得它对多个编写者显然不安全。)
即便如此,1个作者+1个reader运行ning同时出现还是有点奇怪。如果在写入正在进行时读取开始,它将获得 "old" 值,而不是自旋等待几分之一微秒以获取新值。然后下次它读取时已经有一个新的值在等待;上次它错过的那个。所以例如m_position
可以按顺序更新:2, -1, 3.
这可能是可取的,也可能不是可取的,这取决于 "stale" 数据是否具有任何价值,以及如果作者在写入过程中休眠时 reader 阻塞的可接受性。甚至在作者不睡觉的情况下,旋转等待。
很少写入的具有 多个 只读 readers 的小型数据的标准模式是 SeqLock。例如用于在无法自动读取或写入 128 位值的 CPU 上发布 128 位当前时间戳。参见
可能的设计更改
为了确保安全,我们可以让 writer 运行 自由,始终围绕其循环缓冲区,并且 让 reader 跟踪它查看的最后一个元素在.
如果只有一个reader,这应该是一个简单的非原子变量。如果它是一个实例变量,至少把它放在写位置 m_buffer[]
的另一边。
// Possible failure mode: writer wraps around between reads, leaving same m_position
// single-reader
const bool read(T &elem)
{
// FIXME: big hack to get this in a separate cache line from the instance vars
// maybe instead use alignas(64) int m_lastread as a class member, and/or on the other side of m_buffer from m_position.
static int lastread = -1;
int wPos = m_position.load(std::memory_order_acquire); // or cheat with relaxed to get asm that's like "consume"
if (lastread == wPos)
return false;
elem = m_buffer[wPos];
lastread = wPos;
return true;
}
您希望 lastread
与作者写入的内容在单独的缓存行中。否则 reader 的 readPos 更新会变慢,因为与 writer 的写入存在虚假共享,反之亦然。
这让 reader(s) 成为真正的只读 wrt。作者写入的缓存行。不过,它仍然需要 MESI 流量来请求对在作者写入后处于修改状态的行的读取访问权限。但是写入器仍然可以 read m_position
而不会丢失缓存,因此它可以立即将其存储到存储缓冲区中。它只需要等待 RFO 获得缓存行的独占所有权,然后才能将元素和更新的 m_position
从其存储缓冲区提交到 L1d 缓存。
TODO:让 m_position
递增而不手动换行,因此我们有一个写入序列号需要很长时间才能换行,避免早期假阴性来自 lastread == wPos
.
使用wPos & (maxElements-1)
作为索引。并且 static_assert(maxElements & (maxElements-1) == 0, "maxElements must be a power of 2");
那么唯一的危险是在很短的时间内未被发现的撕裂-window 如果编写器一直环绕并且正在写入正在读取的元素。对于频繁读取和不频繁写入,以及不太小的缓冲区,这种情况永远不会发生。读取后再次检查 m_position
(类似于 SeqLock,类似于下面)将竞争 window 缩小为仅仍在进行中的写入。
如果有多个 reader,另一个好的选择可能是每个 m_buffer
条目 中的 claimed
标志。所以你会定义
template<typename T>
class WaitFreePublish
{
private:
struct {
alignas(32) T elem; // at most 2 elements per cache line
std::atomic<int8_t> claimed; // writers sets this to 0, readers try to CAS it to 1
// could be bool if we don't end up needing 3 states for anything.
// set to "1" in the constructor? or invert and call it "unclaimed"
} m_buffer[maxElements];
std::atomic<int> m_position {-1};
}
如果 T
末尾有填充,很遗憾我们不能将其用于 claimed
标志:/
这避免了比较位置可能的失败模式:如果写入器在读取之间回绕,最糟糕的情况就是撕裂。我们可以通过让作者先清除 claimed
标志来检测这种撕裂,然后再写入元素的其余部分。
没有其他线程写入m_position
,我们绝对可以放心地使用宽松的负载。我们甚至可以在其他地方缓存写入位置,但 reader 希望不会经常使包含 m_position
的缓存行无效。显然在您的用例中,writer performance/latency 可能没什么大不了的。
所以作者 + reader 可能看起来像这样,使用已知更新顺序的 SeqLock 式撕裂检测来声明标志、元素和 m_position。
/// claimed flag per array element supports concurrent readers
// thread-safety: single-writer only
// update claimed flag first, then element, then m_position.
void publish(const T& elem)
{
const int wPos = m_position.load(std::memory_order_relaxed);
const int nextPos = getNextPos(wPos);
m_buffer[nextPos].claimed.store(0, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release); // make sure that `0` is visible *before* the non-atomic element modification
m_buffer[nextPos].elem = elem;
m_position.store(nextPos, std::memory_order_release);
}
// thread-safety: multiple readers are ok. First one to claim an entry gets it
// check claimed flag before/after to detect overwrite, like a SeqLock
const bool read(T &elem)
{
int rPos = m_position.load(std::memory_order_acquire);
int8_t claimed = m_buffer[rPos].claimed.load(std::memory_order_relaxed);
if (claimed != 0)
return false; // read-only early-out
claimed = 0;
if (!m_buffer[rPos].claimed.compare_exchange_strong(
claimed, 1, std::memory_order_acquire, std::memory_order_relaxed))
return false; // strong CAS failed: another thread claimed it
elem = m_buffer[rPos].elem;
// final check that the writer didn't step on this buffer during read, like a SeqLock
std::atomic_thread_fence(std::memory_order_acquire); // LoadLoad barrier
// We expect it to still be claimed=1 like we set with CAS
// Otherwise we raced with a writer and elem may be torn.
// optionally retry once or twice in this case because we know there's a new value waiting to be read.
return m_buffer[rPos].claimed.load(std::memory_order_relaxed) == 1;
// Note that elem can be updated even if we return false, if there was tearing. Use a temporary if that's not ok.
}
使用 claimed = m_buffer[rPos].exchange(1)
并检查 claimed==0
将是另一种选择,而不是 CAS-strong。也许在 x86 上效率更高一些。在 LL/SC 机器上,如果 CAS 发现与 expected
不匹配,我猜 CAS 可能可以在根本不进行写入的情况下退出,在这种情况下,只读检查毫无意义。
我使用 .claimed.compare_exchange_strong(claimed, 1)
并成功订购 = acquire
以确保 claimed
的读取发生在读取 .elem
.[=209 之前=]
"failure"内存排序可以是relaxed
:如果我们看到它已经被另一个线程声明,我们放弃并且不查看任何共享数据。
compare_exchange_strong
的存储部分的内存顺序可以是relaxed
,所以我们只需要mo_acquire
,而不是acq_rel
。读者不会对共享数据进行任何 other 存储,我认为存储的顺序无关紧要。到负载。 CAS 是原子 RMW。只有一个线程的 CAS 可以在给定的缓冲区元素上成功,因为它们都试图将它从 0 设置为 1。这就是原子 RMW 的工作方式,无论是 relaxed 还是 seq_cst 或介于两者之间的任何东西。
不需要seq_cst:我们不需要刷新存储缓冲区或任何确保存储在此线程之前可见的东西读 .elem
。仅仅作为一个原子 RMW 就足以阻止多个线程真正认为它们成功了。 Release 只会确保它不能在宽松的只读检查之前提前移动。那不会是一个正确性问题。希望没有 x86 编译器会在编译时这样做。 (在 x86 上 运行 时间,RMW 原子操作总是 seq_cst。)
我认为作为一个 RMW 使得它不可能 "step on" 一个作者的写信(在回绕之后)。但这可能是真正的 CPU 实现细节,而不是 ISO C++。在任何给定 .claimed
的全局修改顺序中,我认为 RMW 保持在一起,而 "acquire" 顺序确实使其领先于 .elem
的读取。不属于 RMW 的 release
存储将是一个潜在的问题:作者可以环绕并将 claimed=0
放入新条目,然后 reader 的存储可以最终提交并将其设置为 1,实际上没有 reader 曾经读取过该元素。
如果我们非常确定 reader 不需要检测循环缓冲区的 writer wrap-around,请在 writer 中省略 std::atomic_thread_fence
和 reader。 (已声明和非原子元素存储仍将由发布存储订购到 m_position
)。 reader 可以简化为省去第二次检查,如果通过 CAS,则始终 return 为真。
请注意,m_buffer[nextPos].claimed.store(0, std::memory_order_release);
不会足以阻止后来的非原子存储出现在它之前:发布存储是一种单向屏障,与发布不同栅栏。释放栅栏就像一个双向 StoreStore 屏障。 (在 x86 上免费,在其他 ISA 上便宜。)
不幸的是,这种 SeqLock 风格的撕裂检测在技术上并不能避免 C++ 抽象机中的 UB。在 ISO C++ 中没有好的/安全的方式来表达这种模式,并且众所周知它在真实硬件上的 asm 中是安全的。没有实际使用撕裂值(假设 read()
的调用者忽略它的 elem
值,如果它 return 为假)。
将 elem
设为 std::atomic<T>
会破坏整个目的:这将使用自旋锁来获得原子性,因此它还不如直接使用它。
使用 volatile T elem
会破坏 buffer[i].elem = elem
因为与 C 不同,C++ 不允许将易失性结构 to/from 复制为常规结构。 ()。这对于 SeqLock 类型的模式来说非常烦人,您希望编译器发出高效代码来复制整个对象表示,可选择使用 SIMD 向量。如果您编写一个采用 volatile &T
参数并执行单个成员的构造函数或赋值运算符,您将不会得到它。很明显 volatile
是错误的工具,它只会留下编译器内存屏障,以确保非原子对象在屏障之前被完全读取或完全写入。 std::atomic_thread_fence
我 认为 实际上是安全的,就像 GNU C 中的 asm("" ::: "memory")
一样。它在当前的编译器上实际工作。
简介
我有一个小的 class,它利用 std::atomic 进行无锁操作。由于这个 class 被大量调用,它影响了性能,我遇到了麻烦。
Class 描述
class类似于后进先出,但是一旦pop()函数被调用,它只会return它的ring-buffer最后写入的元素(只有自从有新元素最后弹出())。
一个线程正在调用 push(),另一个线程正在调用 pop()。
我读过的来源
由于这占用了我太多的电脑时间,我决定进一步研究 std::atomic class 及其 memory_order。我在 Whosebug 和其他资源和书籍中阅读了很多可用的 memory_order post,但我无法清楚地了解不同的模式。特别是,我在获取和释放模式之间挣扎:我也看不出为什么它们与 memory_order_seq_cst.
不同我认为每个记忆顺序使用我自己的研究
memory_order_relaxed:在同一个线程中,原子操作是即时的,但其他线程可能无法立即看到最新的值,他们需要一些时间直到它们被更新。代码可以由编译器或 OS.
自由重新排序memory_order_获取/发布: 由 atomic::load 使用。它防止在此之前的代码行被重新排序(compiler/OS 可能会在此行之后重新排序),并使用 [=101 读取存储在此原子上的最新值=]_release 或 memory_order_seq_cst 在这个线程或另一个线程中。 memory_order_release 还可以防止该代码在重新排序后出现。因此,在 acquire/release 中,两者之间的所有代码都可以被 OS 打乱。我不确定那是在同一线程之间,还是在不同线程之间。
memory_order_seq_cst:最简单易用,因为它就像我们使用变量的自然写法,立即刷新其他线程加载函数的值。
LockFreeEx class
template<typename T>
class LockFreeEx
{
public:
void push(const T& element)
{
const int wPos = m_position.load(std::memory_order_seq_cst);
const int nextPos = getNextPos(wPos);
m_buffer[nextPos] = element;
m_position.store(nextPos, std::memory_order_seq_cst);
}
const bool pop(T& returnedElement)
{
const int wPos = m_position.exchange(-1, std::memory_order_seq_cst);
if (wPos != -1)
{
returnedElement = m_buffer[wPos];
return true;
}
else
{
return false;
}
}
private:
static constexpr int maxElements = 8;
static constexpr int getNextPos(int pos) noexcept {return (++pos == maxElements)? 0 : pos;}
std::array<T, maxElements> m_buffer;
std::atomic<int> m_position {-1};
};
我希望如何改进它
所以,我的第一个想法是在所有原子操作中使用 memory_order_relaxed,因为 pop() 线程处于循环中,每 10-15 毫秒在 pop 函数中寻找可用的更新,然后它被允许最初的 pop() 函数失败,后来才意识到有新的更新。只是几毫秒而已。
另一种选择是使用 release/acquire - 但我不确定它们。在 all store() 中使用 release,在 all load() 函数中使用 acquire。
不幸的是,我描述的所有 memory_order 似乎都有效,而且我不确定它们何时会失败,如果它们应该会失败的话。
决赛
请问,如果您在此处使用宽松的内存顺序时发现问题,能否告诉我?或者我应该使用 release/acquire (也许对这些的进一步解释可以帮助我)?为什么?
我认为 relaxed 最适合这个 class,在它的所有 store() 或 load() 中。但我不确定!
感谢阅读。
编辑:额外说明:
看到大家都在问'char',所以改成int,问题解决!但这不是我要解决的问题。
正如我之前所说,class 可能是后进先出法,但只有最后一个元素被推入的地方才重要,如果有的话。
我有一个很大的结构 T(可复制和可分配),我必须以无锁方式在两个线程之间共享它。所以,我知道的唯一方法是使用一个循环缓冲区来写入 T 的最后一个已知值,以及一个知道最后一个写入值的索引的原子。如果没有,索引将为 -1。
请注意,我的推送线程必须知道何时有 "new T" 可用,这就是为什么 pop() return 是一个布尔值。
再次感谢所有试图帮助我处理记忆命令的人! :)
阅读解决方案后:
template<typename T>
class LockFreeEx
{
public:
LockFreeEx() {}
LockFreeEx(const T& initValue): m_data(initValue) {}
// WRITE THREAD - CAN BE SLOW, WILL BE CALLED EACH 500-800ms
void publish(const T& element)
{
// I used acquire instead relaxed to makesure wPos is always the lastest w_writePos value, and nextPos calculates the right one
const int wPos = m_writePos.load(std::memory_order_acquire);
const int nextPos = (wPos + 1) % bufferMaxSize;
m_buffer[nextPos] = element;
m_writePos.store(nextPos, std::memory_order_release);
}
// READ THREAD - NEED TO BE VERY FAST - CALLED ONCE AT THE BEGGINING OF THE LOOP each 2ms
inline void update()
{
// should I change to relaxed? It doesn't matter I don't get the new value or the old one, since I will call this function again very soon, and again, and again...
const int writeIndex = m_writePos.load(std::memory_order_acquire);
// Updating only in case there is something new... T may be a heavy struct
if (m_readPos != writeIndex)
{
m_readPos = writeIndex;
m_data = m_buffer[m_readPos];
}
}
// NEED TO BE LIGHTNING FAST, CALLED MULTIPLE TIMES IN THE READ THREAD
inline const T& get() const noexcept {return m_data;}
private:
// Buffer
static constexpr int bufferMaxSize = 4;
std::array<T, bufferMaxSize> m_buffer;
std::atomic<int> m_writePos {0};
int m_readPos = 0;
// Data
T m_data;
};
内存顺序与您何时看到对原子对象的某些特定更改无关,而是与此更改可以保证周围代码的内容有关。 Relaxed atomics 除了对原子对象本身的更改外,什么都不保证:更改将是原子的。但是您不能在任何同步上下文中使用松散原子。
并且您有一些代码需要同步。您想要弹出已推送的内容,而不是尝试弹出尚未推送的内容。因此,如果您使用宽松的操作,则无法保证您的 pop 会看到此推送代码:
m_buffer[nextPos] = element;
m_position.store(nextPos, std::memory_relaxed);
如上所写。也可以这样看:
m_position.store(nextPos, std::memory_relaxed);
m_buffer[nextPos] = element;
因此您可能会尝试从缓冲区中获取一个尚不存在的元素。因此,您必须使用一些同步并且至少使用 acquire/release 内存顺序。
以及您的实际代码。我觉得顺序可以如下:
const char wPos = m_position.load(std::memory_order_relaxed);
...
m_position.store(nextPos, std::memory_order_release);
...
const char wPos = m_position.exchange(-1, memory_order_acquire);
你的作者只需要release
,不需要seq-cst,但是relaxed
太弱了。在 m_buffer[]
条目的非原子赋值 之后 之前,您不能发布 m_position
的值。 您需要发布顺序以确保 m_position
存储仅对其他线程可见 在 所有较早的内存操作之后。 (包括非原子分配)。 https://preshing.com/20120913/acquire-and-release-semantics/
这必须 "synchronize-with" 获取或 seq_cst 加载到 reader。或者在 reader.
中至少mo_consume
理论上你还需要wpos = m_position
至少是acquire
(或者consume
中的reader),不放松,因为C++11的内存模型对于值预测之类的事情来说足够弱,它可以让编译器在加载实际从连贯缓存中获取值之前推测性地使用 wPos
的值。
(在实际 CPUs 上的实践中,疯狂的编译器可以使用 test/branch 来引入控制依赖性,从而允许分支预测 + 推测执行来打破可能值的数据依赖性wPos
.)
但是普通的编译器不会那样做。在除 DEC Alpha 之外的 CPU 上,wPos = m_position
源代码中的数据依赖性然后使用 m_buffer[wPos]
将在 asm 中创建数据依赖性,如 mo_consume
应该从中获利。 Alpha 以外的真实 ISA 保证依赖加载的依赖顺序。 (甚至在 Alpha 上,使用宽松的原子交换可能足以关闭存在于少数允许重新排序的真实 Alpha CPU 上的微小 window。)
为 x86 编译时,使用 mo_acquire
没有任何缺点;它不需要任何额外的障碍。在其他 ISA 上可能存在,例如 32 位 ARM,其中 acquire
成本障碍,因此 "cheating" 具有宽松的负载可能是一个胜利,在实践中仍然是安全的。当前的编译器总是将 mo_consume
强化为 mo_acquire
,因此很遗憾我们无法利用它。
即使使用 seq_cst
,您也已经有了真实的竞争条件。
- 初始状态:
m_position = 0
- reader "claims" 插槽 0 通过在
m_position = -1
中交换并读取 部分m_buffer[0];
- reader 出于某种原因休眠(例如定时器中断取消调度),或者只是与作家赛跑。
- 编写器将
wPos = m_position
读取为-1
,并计算nextPos = 0
。
覆盖部分读取的m_buffer[0]
- reader一觉醒来读完了,撕破了
T &element
。 C++抽象机中的数据竞赛UB,实践中的撕逼
在读取后添加对 m_position
的第二次检查(如 SeqLock)无法在所有情况下检测到这一点,因为编写器直到 后才更新 m_position
写入缓冲区元素。
即使您的实际用例在读取和写入之间存在很长的间隔,这个缺陷也会让您几乎同时发生一次读取和写入。
I for sure know that the read side cannot wait for nothing and cannot be stopped (it's audio) and it's poped each 5-10ms, and the write side is the user input, which is more slower, a faster one could do a push once each 500ms.
一毫秒在现代 CPU 上是 ages。线程间延迟通常约为 60 ns,因此不到一微秒,例如来自四核 Intel x86。只要你不睡在互斥锁上,在放弃之前旋转重试一两次就不是问题。
代码审查:
The class similar to a LIFO, but once the pop() function is called, it only return the last written element of its ring-buffer (only if there are new elements since last pop()).
这不是真正的队列或堆栈:push 和 pop 不是好名字。 "publish" 和 "read" 或 "get" 可能更好,并使它的用途更加明显。
我会在代码中加入注释来描述这样一个事实,即这对单个作者、多个 reader 是安全的。 (push
中 m_position
的非原子增量使得它对多个编写者显然不安全。)
即便如此,1个作者+1个reader运行ning同时出现还是有点奇怪。如果在写入正在进行时读取开始,它将获得 "old" 值,而不是自旋等待几分之一微秒以获取新值。然后下次它读取时已经有一个新的值在等待;上次它错过的那个。所以例如m_position
可以按顺序更新:2, -1, 3.
这可能是可取的,也可能不是可取的,这取决于 "stale" 数据是否具有任何价值,以及如果作者在写入过程中休眠时 reader 阻塞的可接受性。甚至在作者不睡觉的情况下,旋转等待。
很少写入的具有 多个 只读 readers 的小型数据的标准模式是 SeqLock。例如用于在无法自动读取或写入 128 位值的 CPU 上发布 128 位当前时间戳。参见
可能的设计更改
为了确保安全,我们可以让 writer 运行 自由,始终围绕其循环缓冲区,并且 让 reader 跟踪它查看的最后一个元素在.
如果只有一个reader,这应该是一个简单的非原子变量。如果它是一个实例变量,至少把它放在写位置 m_buffer[]
的另一边。
// Possible failure mode: writer wraps around between reads, leaving same m_position
// single-reader
const bool read(T &elem)
{
// FIXME: big hack to get this in a separate cache line from the instance vars
// maybe instead use alignas(64) int m_lastread as a class member, and/or on the other side of m_buffer from m_position.
static int lastread = -1;
int wPos = m_position.load(std::memory_order_acquire); // or cheat with relaxed to get asm that's like "consume"
if (lastread == wPos)
return false;
elem = m_buffer[wPos];
lastread = wPos;
return true;
}
您希望 lastread
与作者写入的内容在单独的缓存行中。否则 reader 的 readPos 更新会变慢,因为与 writer 的写入存在虚假共享,反之亦然。
这让 reader(s) 成为真正的只读 wrt。作者写入的缓存行。不过,它仍然需要 MESI 流量来请求对在作者写入后处于修改状态的行的读取访问权限。但是写入器仍然可以 read m_position
而不会丢失缓存,因此它可以立即将其存储到存储缓冲区中。它只需要等待 RFO 获得缓存行的独占所有权,然后才能将元素和更新的 m_position
从其存储缓冲区提交到 L1d 缓存。
TODO:让 m_position
递增而不手动换行,因此我们有一个写入序列号需要很长时间才能换行,避免早期假阴性来自 lastread == wPos
.
使用wPos & (maxElements-1)
作为索引。并且 static_assert(maxElements & (maxElements-1) == 0, "maxElements must be a power of 2");
那么唯一的危险是在很短的时间内未被发现的撕裂-window 如果编写器一直环绕并且正在写入正在读取的元素。对于频繁读取和不频繁写入,以及不太小的缓冲区,这种情况永远不会发生。读取后再次检查 m_position
(类似于 SeqLock,类似于下面)将竞争 window 缩小为仅仍在进行中的写入。
如果有多个 reader,另一个好的选择可能是每个 m_buffer
条目 中的 claimed
标志。所以你会定义
template<typename T>
class WaitFreePublish
{
private:
struct {
alignas(32) T elem; // at most 2 elements per cache line
std::atomic<int8_t> claimed; // writers sets this to 0, readers try to CAS it to 1
// could be bool if we don't end up needing 3 states for anything.
// set to "1" in the constructor? or invert and call it "unclaimed"
} m_buffer[maxElements];
std::atomic<int> m_position {-1};
}
如果 T
末尾有填充,很遗憾我们不能将其用于 claimed
标志:/
这避免了比较位置可能的失败模式:如果写入器在读取之间回绕,最糟糕的情况就是撕裂。我们可以通过让作者先清除 claimed
标志来检测这种撕裂,然后再写入元素的其余部分。
没有其他线程写入m_position
,我们绝对可以放心地使用宽松的负载。我们甚至可以在其他地方缓存写入位置,但 reader 希望不会经常使包含 m_position
的缓存行无效。显然在您的用例中,writer performance/latency 可能没什么大不了的。
所以作者 + reader 可能看起来像这样,使用已知更新顺序的 SeqLock 式撕裂检测来声明标志、元素和 m_position。
/// claimed flag per array element supports concurrent readers
// thread-safety: single-writer only
// update claimed flag first, then element, then m_position.
void publish(const T& elem)
{
const int wPos = m_position.load(std::memory_order_relaxed);
const int nextPos = getNextPos(wPos);
m_buffer[nextPos].claimed.store(0, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release); // make sure that `0` is visible *before* the non-atomic element modification
m_buffer[nextPos].elem = elem;
m_position.store(nextPos, std::memory_order_release);
}
// thread-safety: multiple readers are ok. First one to claim an entry gets it
// check claimed flag before/after to detect overwrite, like a SeqLock
const bool read(T &elem)
{
int rPos = m_position.load(std::memory_order_acquire);
int8_t claimed = m_buffer[rPos].claimed.load(std::memory_order_relaxed);
if (claimed != 0)
return false; // read-only early-out
claimed = 0;
if (!m_buffer[rPos].claimed.compare_exchange_strong(
claimed, 1, std::memory_order_acquire, std::memory_order_relaxed))
return false; // strong CAS failed: another thread claimed it
elem = m_buffer[rPos].elem;
// final check that the writer didn't step on this buffer during read, like a SeqLock
std::atomic_thread_fence(std::memory_order_acquire); // LoadLoad barrier
// We expect it to still be claimed=1 like we set with CAS
// Otherwise we raced with a writer and elem may be torn.
// optionally retry once or twice in this case because we know there's a new value waiting to be read.
return m_buffer[rPos].claimed.load(std::memory_order_relaxed) == 1;
// Note that elem can be updated even if we return false, if there was tearing. Use a temporary if that's not ok.
}
使用 claimed = m_buffer[rPos].exchange(1)
并检查 claimed==0
将是另一种选择,而不是 CAS-strong。也许在 x86 上效率更高一些。在 LL/SC 机器上,如果 CAS 发现与 expected
不匹配,我猜 CAS 可能可以在根本不进行写入的情况下退出,在这种情况下,只读检查毫无意义。
我使用 .claimed.compare_exchange_strong(claimed, 1)
并成功订购 = acquire
以确保 claimed
的读取发生在读取 .elem
.[=209 之前=]
"failure"内存排序可以是relaxed
:如果我们看到它已经被另一个线程声明,我们放弃并且不查看任何共享数据。
compare_exchange_strong
的存储部分的内存顺序可以是relaxed
,所以我们只需要mo_acquire
,而不是acq_rel
。读者不会对共享数据进行任何 other 存储,我认为存储的顺序无关紧要。到负载。 CAS 是原子 RMW。只有一个线程的 CAS 可以在给定的缓冲区元素上成功,因为它们都试图将它从 0 设置为 1。这就是原子 RMW 的工作方式,无论是 relaxed 还是 seq_cst 或介于两者之间的任何东西。
不需要seq_cst:我们不需要刷新存储缓冲区或任何确保存储在此线程之前可见的东西读 .elem
。仅仅作为一个原子 RMW 就足以阻止多个线程真正认为它们成功了。 Release 只会确保它不能在宽松的只读检查之前提前移动。那不会是一个正确性问题。希望没有 x86 编译器会在编译时这样做。 (在 x86 上 运行 时间,RMW 原子操作总是 seq_cst。)
我认为作为一个 RMW 使得它不可能 "step on" 一个作者的写信(在回绕之后)。但这可能是真正的 CPU 实现细节,而不是 ISO C++。在任何给定 .claimed
的全局修改顺序中,我认为 RMW 保持在一起,而 "acquire" 顺序确实使其领先于 .elem
的读取。不属于 RMW 的 release
存储将是一个潜在的问题:作者可以环绕并将 claimed=0
放入新条目,然后 reader 的存储可以最终提交并将其设置为 1,实际上没有 reader 曾经读取过该元素。
如果我们非常确定 reader 不需要检测循环缓冲区的 writer wrap-around,请在 writer 中省略 std::atomic_thread_fence
和 reader。 (已声明和非原子元素存储仍将由发布存储订购到 m_position
)。 reader 可以简化为省去第二次检查,如果通过 CAS,则始终 return 为真。
请注意,m_buffer[nextPos].claimed.store(0, std::memory_order_release);
不会足以阻止后来的非原子存储出现在它之前:发布存储是一种单向屏障,与发布不同栅栏。释放栅栏就像一个双向 StoreStore 屏障。 (在 x86 上免费,在其他 ISA 上便宜。)
不幸的是,这种 SeqLock 风格的撕裂检测在技术上并不能避免 C++ 抽象机中的 UB。在 ISO C++ 中没有好的/安全的方式来表达这种模式,并且众所周知它在真实硬件上的 asm 中是安全的。没有实际使用撕裂值(假设 read()
的调用者忽略它的 elem
值,如果它 return 为假)。
将 elem
设为 std::atomic<T>
会破坏整个目的:这将使用自旋锁来获得原子性,因此它还不如直接使用它。
使用 volatile T elem
会破坏 buffer[i].elem = elem
因为与 C 不同,C++ 不允许将易失性结构 to/from 复制为常规结构。 (volatile &T
参数并执行单个成员的构造函数或赋值运算符,您将不会得到它。很明显 volatile
是错误的工具,它只会留下编译器内存屏障,以确保非原子对象在屏障之前被完全读取或完全写入。 std::atomic_thread_fence
我 认为 实际上是安全的,就像 GNU C 中的 asm("" ::: "memory")
一样。它在当前的编译器上实际工作。