std::atomic 和 std::mutex 的相对表现
Relative performance of std::atomic and std::mutex
我正在考虑为项目实施队列的选项,其要求是生产者至少必须尽可能低延迟。为此,我一直在研究 "lock free" 队列使用 std::atomic
来控制生产者和消费者线程对数据结构的访问。我希望这将避免 std::mutex
中的开销,特别是代码当前使用的 std::unique_lock
。
为此,我编写了一个简单的测试程序来评估std::mutex
(加上std::unique_lock
)和std::atomic
的相对性能。该程序还进行检查以确保原子对象是无锁的,确实如此。
#include <mutex>
#include <atomic>
#include <thread>
#include <chrono>
#include <iostream>
#define CYCLES 100000000
void testAtomic()
{
bool var(true);
std::atomic_bool _value(true);
std::cout << "atomic bool is ";
if(!_value.is_lock_free())
std::cout << "not ";
std::cout << "lock free" << std::endl;
const auto _start_time = std::chrono::high_resolution_clock::now();
for(size_t counter = 0; counter < CYCLES; counter++)
{
var = _value.load();
var = !var;
_value.store(var);
}
const auto _end_time = std::chrono::high_resolution_clock::now();
std::cout << 1e-6 * std::chrono::duration_cast<std::chrono::microseconds>
(_end_time - _start_time).count() << " s" << std::endl;
}
void testMutex()
{
bool var(true);
std::mutex _mutex;
std::chrono::high_resolution_clock _clock;
const auto _start_time = std::chrono::high_resolution_clock::now();
for(size_t counter = 0; counter < CYCLES; counter++)
{
std::unique_lock<std::mutex> lock(_mutex);
var = !var;
}
const auto _end_time = std::chrono::high_resolution_clock::now();
std::cout << 1e-6 * std::chrono::duration_cast<std::chrono::microseconds>
(_end_time - _start_time).count() << " s" << std::endl;
}
int main()
{
std::thread t1(testAtomic);
t1.join();
std::thread t2(testMutex);
t2.join();
return 0;
}
当运行这个程序时,我得到以下输出:
atomic bool is lock free
3.49434 s
2.31755 s
这向我表明 std::mutex
(和 std::unique_lock
)要快得多,这与我在阅读原子与互斥锁时所期望的相反。我的发现是否正确?我的测试程序有问题吗? 我对两者的区别理解有误吗?
代码是在 CentOS7 上用 GCC 4.8.5 编译的
有人告诉我,在互斥锁的某些实现中,最初它会先在内部自旋锁。
这可能就是它看起来更快的原因。
如果进行系统调用,我怀疑你会得到相同的结果。
(我无法验证这一点,但我认为这可能是一个原因)
你在什么硬件上测试?
由于您使用的是 GCC,因此 std::atomic
seq_cst 商店将使用 mov
+ 慢速 mfence
而不是稍微慢一点的 xchg
-with-mem(这也是一个完整的屏障,就像所有其他 x86 原子 RMW 操作一样)。
使用互斥量需要一个原子 RMW(类似于 xchg
,而不是 mov + mfence)。如果你幸运的话,释放互斥体可能只是一个普通的存储(比如 mo_release
)。争用为零,因此获取锁总是成功的。
这些互斥锁定/解锁库函数背后的代码比 mfence
、 更便宜,这当然是合理的,尤其是在具有更新微码的 Skylake CPU 上,其中 mfence
是乱序执行的完整屏障以及内存。 (见底this answer, and also )
此外,请注意,您的互斥锁循环将本地 bool var
优化为一个寄存器,实际上并没有在循环内的内存中更新它。 (您的代码 on the Godbolt compiler explorer with gcc4.8.5)。
# the main loop from testMutex
.L80: # do {
mov rdi, rsp # pointer to _mutex on the stack
call __gthrw_pthread_mutex_lock(pthread_mutex_t*)
test eax, eax
jne .L91 # mutex error handling
mov rdi, rsp # pointer to _mutex again
call __gthrw_pthread_mutex_unlock(pthread_mutex_t*)
sub rbx, 1
jne .L80 # }while(--counter)
循环内的 xor bl, 1
是无关紧要的;乱序执行可能会与其他工作重叠。
如果对 var
的引用转义了该函数,因此编译器必须在非内联函数调用(包括对 pthread 库函数)之前将其同步到内存中,我们期望像 xor byte ptr [rsp+8], 1
。这也将是相当便宜的,并且可能大部分被无序执行程序隐藏,尽管 load/ALU/store 可能是耗尽存储缓冲区时必须等待的完整屏障。
正在加速您的 std::atomic
代码:
您似乎有意避免执行原子 RMW,而是加载到 tmp var 中并执行单独的存储。如果您只使用 release 而不是 seq_cst,那么它就可以在 x86 上编译为一条普通的存储指令。 (或者在大多数其他 ISA 上更便宜的障碍)。
bool tmp = _value.load(std::memory_order_relaxed); // or acquire
_value.store(!tmp, std::memory_order_release);
这应该 运行 每次反转大约 6 个周期,只是一个 ALU 操作的延迟加上 store/reload 的存储转发延迟。对于 mfence
(https://uops.info/).
的最佳吞吐量,每次迭代可能需要 33 个周期
或者由于这是非原子修改,只需存储交替值而不重新读取旧值。在只有一个生产者写入值而其他线程正在读取的情况下,通常只能避免原子 RMW。因此,让生产者将它正在修改的值保存在寄存器中(非原子本地变量),并存储副本。
bool var = true;
for(size_t counter = 0; counter < CYCLES; counter++)
{
var = !var;
_value.store(var, std::memory_order_release);
}
此外,不要在您自己的变量名称中使用前导下划线。这些名称保留用于实现。 (小写的单个 _
仅在文件/全局范围内保留,但这仍然是不好的做法。)
我正在考虑为项目实施队列的选项,其要求是生产者至少必须尽可能低延迟。为此,我一直在研究 "lock free" 队列使用 std::atomic
来控制生产者和消费者线程对数据结构的访问。我希望这将避免 std::mutex
中的开销,特别是代码当前使用的 std::unique_lock
。
为此,我编写了一个简单的测试程序来评估std::mutex
(加上std::unique_lock
)和std::atomic
的相对性能。该程序还进行检查以确保原子对象是无锁的,确实如此。
#include <mutex>
#include <atomic>
#include <thread>
#include <chrono>
#include <iostream>
#define CYCLES 100000000
void testAtomic()
{
bool var(true);
std::atomic_bool _value(true);
std::cout << "atomic bool is ";
if(!_value.is_lock_free())
std::cout << "not ";
std::cout << "lock free" << std::endl;
const auto _start_time = std::chrono::high_resolution_clock::now();
for(size_t counter = 0; counter < CYCLES; counter++)
{
var = _value.load();
var = !var;
_value.store(var);
}
const auto _end_time = std::chrono::high_resolution_clock::now();
std::cout << 1e-6 * std::chrono::duration_cast<std::chrono::microseconds>
(_end_time - _start_time).count() << " s" << std::endl;
}
void testMutex()
{
bool var(true);
std::mutex _mutex;
std::chrono::high_resolution_clock _clock;
const auto _start_time = std::chrono::high_resolution_clock::now();
for(size_t counter = 0; counter < CYCLES; counter++)
{
std::unique_lock<std::mutex> lock(_mutex);
var = !var;
}
const auto _end_time = std::chrono::high_resolution_clock::now();
std::cout << 1e-6 * std::chrono::duration_cast<std::chrono::microseconds>
(_end_time - _start_time).count() << " s" << std::endl;
}
int main()
{
std::thread t1(testAtomic);
t1.join();
std::thread t2(testMutex);
t2.join();
return 0;
}
当运行这个程序时,我得到以下输出:
atomic bool is lock free
3.49434 s
2.31755 s
这向我表明 std::mutex
(和 std::unique_lock
)要快得多,这与我在阅读原子与互斥锁时所期望的相反。我的发现是否正确?我的测试程序有问题吗? 我对两者的区别理解有误吗?
代码是在 CentOS7 上用 GCC 4.8.5 编译的
有人告诉我,在互斥锁的某些实现中,最初它会先在内部自旋锁。
这可能就是它看起来更快的原因。
如果进行系统调用,我怀疑你会得到相同的结果。
(我无法验证这一点,但我认为这可能是一个原因)
你在什么硬件上测试?
由于您使用的是 GCC,因此 std::atomic
seq_cst 商店将使用 mov
+ 慢速 mfence
而不是稍微慢一点的 xchg
-with-mem(这也是一个完整的屏障,就像所有其他 x86 原子 RMW 操作一样)。
使用互斥量需要一个原子 RMW(类似于 xchg
,而不是 mov + mfence)。如果你幸运的话,释放互斥体可能只是一个普通的存储(比如 mo_release
)。争用为零,因此获取锁总是成功的。
这些互斥锁定/解锁库函数背后的代码比 mfence
、 更便宜,这当然是合理的,尤其是在具有更新微码的 Skylake CPU 上,其中 mfence
是乱序执行的完整屏障以及内存。 (见底this answer, and also
此外,请注意,您的互斥锁循环将本地 bool var
优化为一个寄存器,实际上并没有在循环内的内存中更新它。 (您的代码 on the Godbolt compiler explorer with gcc4.8.5)。
# the main loop from testMutex
.L80: # do {
mov rdi, rsp # pointer to _mutex on the stack
call __gthrw_pthread_mutex_lock(pthread_mutex_t*)
test eax, eax
jne .L91 # mutex error handling
mov rdi, rsp # pointer to _mutex again
call __gthrw_pthread_mutex_unlock(pthread_mutex_t*)
sub rbx, 1
jne .L80 # }while(--counter)
循环内的 xor bl, 1
是无关紧要的;乱序执行可能会与其他工作重叠。
如果对 var
的引用转义了该函数,因此编译器必须在非内联函数调用(包括对 pthread 库函数)之前将其同步到内存中,我们期望像 xor byte ptr [rsp+8], 1
。这也将是相当便宜的,并且可能大部分被无序执行程序隐藏,尽管 load/ALU/store 可能是耗尽存储缓冲区时必须等待的完整屏障。
正在加速您的 std::atomic
代码:
您似乎有意避免执行原子 RMW,而是加载到 tmp var 中并执行单独的存储。如果您只使用 release 而不是 seq_cst,那么它就可以在 x86 上编译为一条普通的存储指令。 (或者在大多数其他 ISA 上更便宜的障碍)。
bool tmp = _value.load(std::memory_order_relaxed); // or acquire
_value.store(!tmp, std::memory_order_release);
这应该 运行 每次反转大约 6 个周期,只是一个 ALU 操作的延迟加上 store/reload 的存储转发延迟。对于 mfence
(https://uops.info/).
或者由于这是非原子修改,只需存储交替值而不重新读取旧值。在只有一个生产者写入值而其他线程正在读取的情况下,通常只能避免原子 RMW。因此,让生产者将它正在修改的值保存在寄存器中(非原子本地变量),并存储副本。
bool var = true;
for(size_t counter = 0; counter < CYCLES; counter++)
{
var = !var;
_value.store(var, std::memory_order_release);
}
此外,不要在您自己的变量名称中使用前导下划线。这些名称保留用于实现。 (小写的单个 _
仅在文件/全局范围内保留,但这仍然是不好的做法。)