用无锁数据传输替换 std::mutex 队列
replace std::mutex queue with lock less transfer of data
目前我试图理解 std::atomic 的语义,尤其是 memory_order_acquire/memory_order_release。
我读了这个博客post
http://preshing.com/20140709/the-purpose-of-memory_order_consume-in-cpp11/
但我不确定我尝试在示例中实现的模式是否正确。
目的是用无锁操作替换经典的std::mutex锁定"pass data from one thread to a queue"、"Grab with an other thread from the queue"。
我的想法是当guard为1时receiver从payload中读取,当guard为0时Sender写入payload
我 运行 这个样本循环很多,断言从未被触发。我知道,要证明 运行 大量循环没有问题是很困难甚至不可能的...
#include <atomic>
#include <thread>
#include <cassert>
std::atomic<int> Guard(0);
int payload = 0;
void receiver()
{
for (int i = 0; i < 100000; ++i)
{
int g;
while ((g = Guard.load(std::memory_order_acquire)) == 0);
assert(payload == 42);
payload = 0;
Guard.store(0, std::memory_order_release);
}
}
void sender()
{
for (int i = 0; i < 100000; ++i)
{
int g;
while ((g = Guard.load(std::memory_order_acquire)) != 0);
assert(payload == 0);
payload = 42;
Guard.store(1, std::memory_order_release);
}
}
int main(int argc, char** argv)
{
for (int i = 0; i < 10; ++i) {
std::thread t1(receiver);
std::thread t2(sender);
t1.join();
t2.join();
}
}
您的代码不是无锁的。
这可能会让人感到惊讶,因为您没有在任何地方使用 std::mutex
,但仅此还不足以让您获得锁定自由。问题是,如果原子 Guard
没有预期值,您仍然会发送线程自旋等待。它们很可能永远旋转,除非另一个线程将它们释放。
这是(自旋)锁的定义。您只是使用原子学自己实现了它,而不是使用库中的预制锁。
为了实现 truly lock-free,您需要一个复杂得多的实现。
For a data structure to qualify as lock-free, if any thread performing
an operation on the data structure is suspended at any point during
that operation then the other threads accessing the data structure
must still be able to complete their tasks. This is the fundamental
restriction which distinguishes it from non-blocking data structures
that use spin-locks or other busy-wait mechanisms.
您的实施情况并非如此。您需要始终获得一个 sender
线程,后跟一个 receiver
线程。如果 receiver
先出现,它将被阻塞直到 sender
完成。如果那没有发生,receiver
将永远被阻塞,在忙碌等待的同时快乐地燃烧 CPU 时间。同样,如果第二个 sender
在第一个之后开始,它将被阻塞,直到 receiver
完成执行并重置守卫。
多个发送者和接收者同时 运行 将导致竞争,尽管这可能是这里有意限制的。
设计合适的无锁算法比使用锁要难几个数量级。很容易搞砸,这几乎是荒谬的。如果您仍然 100% 决心尝试一下,请确保您对原子有坚实的理解,并为自己准备一堆关于该主题的文献。然后看看现有的实现,例如 Boost.Lockfree。有龙!别说我没提醒你
目前我试图理解 std::atomic 的语义,尤其是 memory_order_acquire/memory_order_release。
我读了这个博客post http://preshing.com/20140709/the-purpose-of-memory_order_consume-in-cpp11/ 但我不确定我尝试在示例中实现的模式是否正确。
目的是用无锁操作替换经典的std::mutex锁定"pass data from one thread to a queue"、"Grab with an other thread from the queue"。
我的想法是当guard为1时receiver从payload中读取,当guard为0时Sender写入payload
我 运行 这个样本循环很多,断言从未被触发。我知道,要证明 运行 大量循环没有问题是很困难甚至不可能的...
#include <atomic>
#include <thread>
#include <cassert>
std::atomic<int> Guard(0);
int payload = 0;
void receiver()
{
for (int i = 0; i < 100000; ++i)
{
int g;
while ((g = Guard.load(std::memory_order_acquire)) == 0);
assert(payload == 42);
payload = 0;
Guard.store(0, std::memory_order_release);
}
}
void sender()
{
for (int i = 0; i < 100000; ++i)
{
int g;
while ((g = Guard.load(std::memory_order_acquire)) != 0);
assert(payload == 0);
payload = 42;
Guard.store(1, std::memory_order_release);
}
}
int main(int argc, char** argv)
{
for (int i = 0; i < 10; ++i) {
std::thread t1(receiver);
std::thread t2(sender);
t1.join();
t2.join();
}
}
您的代码不是无锁的。
这可能会让人感到惊讶,因为您没有在任何地方使用 std::mutex
,但仅此还不足以让您获得锁定自由。问题是,如果原子 Guard
没有预期值,您仍然会发送线程自旋等待。它们很可能永远旋转,除非另一个线程将它们释放。
这是(自旋)锁的定义。您只是使用原子学自己实现了它,而不是使用库中的预制锁。
为了实现 truly lock-free,您需要一个复杂得多的实现。
For a data structure to qualify as lock-free, if any thread performing an operation on the data structure is suspended at any point during that operation then the other threads accessing the data structure must still be able to complete their tasks. This is the fundamental restriction which distinguishes it from non-blocking data structures that use spin-locks or other busy-wait mechanisms.
您的实施情况并非如此。您需要始终获得一个 sender
线程,后跟一个 receiver
线程。如果 receiver
先出现,它将被阻塞直到 sender
完成。如果那没有发生,receiver
将永远被阻塞,在忙碌等待的同时快乐地燃烧 CPU 时间。同样,如果第二个 sender
在第一个之后开始,它将被阻塞,直到 receiver
完成执行并重置守卫。
多个发送者和接收者同时 运行 将导致竞争,尽管这可能是这里有意限制的。
设计合适的无锁算法比使用锁要难几个数量级。很容易搞砸,这几乎是荒谬的。如果您仍然 100% 决心尝试一下,请确保您对原子有坚实的理解,并为自己准备一堆关于该主题的文献。然后看看现有的实现,例如 Boost.Lockfree。有龙!别说我没提醒你