C++ atomic 用 atomic<int> 替换 mutex 安全吗?
C++ atomic is it safe to replace a mutex with an atomic<int>?
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
using namespace std;
const int FLAG1 = 1, FLAG2 = 2, FLAG3 = 3;
int res = 0;
atomic<int> flagger;
void func1()
{
for (int i=1; i<=1000000; i++) {
while (flagger.load(std::memory_order_relaxed) != FLAG1) {}
res++; // maybe a bunch of other code here that don't modify flagger
// code here must not be moved outside the load/store (like mutex lock/unlock)
flagger.store(FLAG2, std::memory_order_relaxed);
}
cout << "Func1 finished\n";
}
void func2()
{
for (int i=1; i<=1000000; i++) {
while (flagger.load(std::memory_order_relaxed) != FLAG2) {}
res++; // same
flagger.store(FLAG3, std::memory_order_relaxed);
}
cout << "Func2 finished\n";
}
void func3() {
for (int i=1; i<=1000000; i++) {
while (flagger.load(std::memory_order_relaxed) != FLAG3) {}
res++; // same
flagger.store(FLAG1, std::memory_order_relaxed);
}
cout << "Func3 finished\n";
}
int main()
{
flagger = FLAG1;
std::thread first(func1);
std::thread second(func2);
std::thread third(func3);
first.join();
second.join();
third.join();
cout << "res = " << res << "\n";
return 0;
}
我的程序有一段类似于这个例子。基本上,有 3 个线程:输入器、处理器和输出器。我发现使用 atomic 的忙等待比使用条件变量让线程休眠更快,例如:
std::mutex commonMtx;
std::condition_variable waiter1, waiter2, waiter3;
// then in func1()
unique_lock<std::mutex> uniquer1(commonMtx);
while (flagger != FLAG1) waiter1.wait(uniquer1);
但是,示例中的代码安全吗?当我 运行 它给出正确的结果(-std=c++17 -O3
标志)。但是,我不知道编译器是否可以将我的指令重新排序到原子 check/set 块之外,尤其是 std::memory_order_relaxed
。如果它不安全,那么有什么方法可以让它安全同时比互斥锁更快?
编辑:保证线程数 < CPU 核心数
std::memory_order_relaxed
导致除原子本身外,无法保证内存操作的顺序。
因此,您的所有 res++;
操作都是数据竞争,并且您的程序具有未定义的行为。
示例:
#include<atomic>
int x;
std::atomic<int> a{0};
void f() {
x = 1;
a.store(1, std::memory_order_relaxed);
x = 2;
}
Clang 13 on x86_64 with -O2
将此函数编译为
mov dword ptr [rip + a], 1
mov dword ptr [rip + x], 2
ret
(https://godbolt.org/z/hxjYeG5dv)
即使在缓存一致的平台上,在第一个和第二个 mov
之间,另一个线程可以观察到 a
设置为 1
,但 x
未设置为1
.
您必须使用 memory_order_release
和 memory_order_acquire
(或顺序一致性)来替换互斥锁。
(我应该补充一点,我没有详细检查你的代码,所以我不能说在你的具体情况下简单地替换内存顺序就足够了。)
正如另一个答案中提到的,res++;
在不同的线程中彼此不同步,并导致数据竞争和未定义的行为。这可以用线程清洁剂检查。
要解决此问题,您需要使用 memory_order_acquire
加载和 memory_order_release
存储。修复也可以用线程消毒剂来确认。
while (flagger.load(std::memory_order_acquire) != FLAG1) {}
res++;
flagger.store(FLAG2, std::memory_order_release);
或者,flagger.load(std::memory_order_acquire)
可以换成flagger.load(std::memory_order_relaxed)
,循环后跟std::atomic_thread_fence(std::memory_order_acquire);
。
while (flagger.load(std::memory_order_relaxed) != FLAG1) {}
std::atomic_thread_fence(std::memory_order_acquire);
res++;
flagger.store(FLAG2, std::memory_order_release);
我不确定它能提高多少性能,如果有的话。
思路是循环中只有最后一个加载需要是acquire操作,这就是fence实现的。
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
using namespace std;
const int FLAG1 = 1, FLAG2 = 2, FLAG3 = 3;
int res = 0;
atomic<int> flagger;
void func1()
{
for (int i=1; i<=1000000; i++) {
while (flagger.load(std::memory_order_relaxed) != FLAG1) {}
res++; // maybe a bunch of other code here that don't modify flagger
// code here must not be moved outside the load/store (like mutex lock/unlock)
flagger.store(FLAG2, std::memory_order_relaxed);
}
cout << "Func1 finished\n";
}
void func2()
{
for (int i=1; i<=1000000; i++) {
while (flagger.load(std::memory_order_relaxed) != FLAG2) {}
res++; // same
flagger.store(FLAG3, std::memory_order_relaxed);
}
cout << "Func2 finished\n";
}
void func3() {
for (int i=1; i<=1000000; i++) {
while (flagger.load(std::memory_order_relaxed) != FLAG3) {}
res++; // same
flagger.store(FLAG1, std::memory_order_relaxed);
}
cout << "Func3 finished\n";
}
int main()
{
flagger = FLAG1;
std::thread first(func1);
std::thread second(func2);
std::thread third(func3);
first.join();
second.join();
third.join();
cout << "res = " << res << "\n";
return 0;
}
我的程序有一段类似于这个例子。基本上,有 3 个线程:输入器、处理器和输出器。我发现使用 atomic 的忙等待比使用条件变量让线程休眠更快,例如:
std::mutex commonMtx;
std::condition_variable waiter1, waiter2, waiter3;
// then in func1()
unique_lock<std::mutex> uniquer1(commonMtx);
while (flagger != FLAG1) waiter1.wait(uniquer1);
但是,示例中的代码安全吗?当我 运行 它给出正确的结果(-std=c++17 -O3
标志)。但是,我不知道编译器是否可以将我的指令重新排序到原子 check/set 块之外,尤其是 std::memory_order_relaxed
。如果它不安全,那么有什么方法可以让它安全同时比互斥锁更快?
编辑:保证线程数 < CPU 核心数
std::memory_order_relaxed
导致除原子本身外,无法保证内存操作的顺序。
因此,您的所有 res++;
操作都是数据竞争,并且您的程序具有未定义的行为。
示例:
#include<atomic>
int x;
std::atomic<int> a{0};
void f() {
x = 1;
a.store(1, std::memory_order_relaxed);
x = 2;
}
Clang 13 on x86_64 with -O2
将此函数编译为
mov dword ptr [rip + a], 1
mov dword ptr [rip + x], 2
ret
(https://godbolt.org/z/hxjYeG5dv)
即使在缓存一致的平台上,在第一个和第二个 mov
之间,另一个线程可以观察到 a
设置为 1
,但 x
未设置为1
.
您必须使用 memory_order_release
和 memory_order_acquire
(或顺序一致性)来替换互斥锁。
(我应该补充一点,我没有详细检查你的代码,所以我不能说在你的具体情况下简单地替换内存顺序就足够了。)
正如另一个答案中提到的,res++;
在不同的线程中彼此不同步,并导致数据竞争和未定义的行为。这可以用线程清洁剂检查。
要解决此问题,您需要使用 memory_order_acquire
加载和 memory_order_release
存储。修复也可以用线程消毒剂来确认。
while (flagger.load(std::memory_order_acquire) != FLAG1) {}
res++;
flagger.store(FLAG2, std::memory_order_release);
或者,flagger.load(std::memory_order_acquire)
可以换成flagger.load(std::memory_order_relaxed)
,循环后跟std::atomic_thread_fence(std::memory_order_acquire);
。
while (flagger.load(std::memory_order_relaxed) != FLAG1) {}
std::atomic_thread_fence(std::memory_order_acquire);
res++;
flagger.store(FLAG2, std::memory_order_release);
我不确定它能提高多少性能,如果有的话。
思路是循环中只有最后一个加载需要是acquire操作,这就是fence实现的。