为无锁队列添加阻塞函数
Adding blocking functions to lock-free queue
我有一个基于循环缓冲区的无锁多生产者、单一消费者队列。到目前为止,它只有非阻塞的 push_back()
和 pop_front()
调用。现在我想添加这些调用的阻塞版本,但我想尽量减少这对使用非阻塞版本的代码性能的影响——也就是说,它不应该把它们变成“lock-by -default" 调用。
例如阻塞 push_back() 的最简单版本如下所示:
void push_back_Blocking(const T& pkg) {
if (!push_back(pkg)) {
unique_lock<mutex> ul(mux);
while (!push_back(pkg)) {
cv_notFull.wait(ul);
}
}
}
但不幸的是,这还需要将以下块放在 "non-blocking" pop_front()
的末尾:
{
std::lock_guard<mutex> lg(mux);
cv_notFull.notify_all();
}
虽然 notify
本身几乎没有任何性能影响(如果没有线程在等待),但锁有。
所以我的问题是:
我如何(尽可能使用标准 c++14)将阻塞 push_back
和 pop_front
成员函数添加到我的队列中,而不会严重阻碍 non_blocking 对应项的性能(阅读:最小化系统调用)? 至少只要没有线程实际被阻塞——但理想情况下即使如此。
作为参考,我当前的版本看起来与此类似(我省略了调试检查、数据对齐和显式内存排序):
template<class T, size_t N>
class MPSC_queue {
using INDEX_TYPE = unsigned long;
struct Idx {
INDEX_TYPE idx;
INDEX_TYPE version_cnt;
};
enum class SlotState {
EMPTY,
FILLED
};
struct Slot {
Slot() = default;
std::atomic<SlotState> state= SlotState::EMPTY;
T data{};
};
struct Buffer_t {
std::array<Slot, N> data{};
Buffer_t() {
data.fill(Slot{ SlotState::EMPTY, T{} });
}
Slot& operator[](Idx idx) {
return this->operator[](idx.idx);
}
Slot& operator[](INDEX_TYPE idx) {
return data[idx];
}
};
Buffer_t buffer;
std::atomic<Idx> head{};
std::atomic<INDEX_TYPE> tail=0;
INDEX_TYPE next(INDEX_TYPE old) { return (old + 1) % N; }
Idx next(Idx old) {
old.idx = next(old.idx);
old.version_cnt++;
return old;
}
public:
bool push_back(const T& val) {
auto tHead = head.load();
Idx wrtIdx;
do {
wrtIdx = next(tHead);
if (wrtIdx.idx == tail) {
return false;
}
} while (!head.compare_exchange_strong(tHead, wrtIdx));
buffer[wrtIdx].data = val;
buffer[wrtIdx].state = SlotState::FILLED;
return true;
}
bool pop_front(T& val) {
auto rIdx = next(tail);
if (buffer[rIdx].state != SlotState::FILLED) {
return false;
}
val = buffer[rIdx].data;
buffer[rIdx].state = SlotState::EMPTY;
tail = rIdx;
return true;
}
};
相关问题:
我问了一个类似的问题,专门关于优化 condition_variable::notify
here, but the question got closed as a supposedly duplicate of this question 的使用。
我不同意,因为这个问题是关于为什么一般情况下条件变量需要互斥量(或者更确切地说它是 pthread 等价物) - 关注 condition_variable::wait
- 而不是 if/how 它可以避免 notify
部分。但显然我没有说清楚(或者人们只是不同意我的观点)。
无论如何,链接问题中的答案对我没有帮助,因为无论如何这有点 XY-problem,我决定再问一个关于我遇到的实际问题的问题,从而允许更广泛的讨论可能的解决方案范围(也许有一种方法可以完全避免条件变量)。
This question也很相似,但是
- 它是关于 linux 上的 C 的,答案使用特定于平台的
构造(pthreads 和 futexes)
- 那里的作者要求有效的阻塞调用,但根本没有非阻塞调用。另一方面,我不太关心阻塞的效率,但希望尽可能快地保持非阻塞的效率。
如果条件变量上有潜在 服务员,您必须 锁定notify_all
调用的互斥量。
问题是 条件检查 (!push_back(pkg)
) 在 之前 等待 条件变量(C++11 没有提供其他方式)。所以互斥是唯一可以保证这些动作之间一致性的方法。
但是在没有潜在服务员参与的情况下,可以省略锁定(和通知)。只需使用附加标志:
class MPSC_queue {
... // Original definitions
std::atomic<bool> has_waiters;
public:
void push_back_Blocking(const T& pkg) {
if (!push_back(pkg)) {
unique_lock<mutex> ul(mux);
has_waiters.store(true, std::memory_order_relaxed); // #1
while (!push_back(pkg)) { // #2 inside push_back() method
cv_notFull.wait(ul);
// Other waiter may clean flag while we wait. Set it again. Same as #1.
has_waiters.store(true, std::memory_order_relaxed);
}
has_waiters.store(false, std::memory_order_relaxed);
}
}
// Method is same as original, exposed only for #2 mark.
bool push_back(const T& val) {
auto tHead = head.load();
Idx wrtIdx;
do {
wrtIdx = next(tHead);
if (wrtIdx.idx == tail) { // #2
return false;
}
} while (!head.compare_exchange_strong(tHead, wrtIdx));
buffer[wrtIdx].data = val;
buffer[wrtIdx].state = SlotState::FILLED;
return true;
}
bool pop_front(T& val) {
// Main work, same as original pop_front, exposed only for #3 mark.
auto rIdx = next(tail);
if (buffer[rIdx].state != SlotState::FILLED) {
return false;
}
val = buffer[rIdx].data;
buffer[rIdx].state = SlotState::EMPTY;
tail = rIdx; // #3
// Notification part
if(has_waiters.load(std::memory_order_relaxed)) // #4
{
// There are potential waiters. Need to lock.
std::lock_guard<mutex> lg(mux);
cv_notFull.notify_all();
}
return true;
}
};
这里的关键关系是:
- 在
#1
处设置标志并在 #2
处读取 tail
以检查条件。
- 在
#3
存储 tail
并在 #4
检查标志。
这两种关系都应该揭示某种普遍秩序。也就是说 #1
应该在 #2
之前被其他线程观察到。 #3
和 #4
.
相同
在那种情况下,可以保证,如果检查标志 #4
发现它没有设置,那么可能进一步的条件检查 #2
将发现条件变化的影响 #3
。所以不加锁(和通知)是安全的,因为没有waiter是不可能的。
在您当前的实现中,#1
和 #2
之间的通用顺序 是通过使用隐式 加载 tail
[提供的]memory_order_seq_cst。 #3
和 #4
之间的相同顺序通过使用隐式 memory_order_seq_cst.
存储 tail
来提供
在那种方法中,"Do not lock if no waiters"、通用顺序是最棘手的部分。在这两种关系中,都是Read After Write顺序,这是memory_order_acquire和的任意组合无法实现的memory_order_release。所以应该用memory_order_seq_cst
我有一个基于循环缓冲区的无锁多生产者、单一消费者队列。到目前为止,它只有非阻塞的 push_back()
和 pop_front()
调用。现在我想添加这些调用的阻塞版本,但我想尽量减少这对使用非阻塞版本的代码性能的影响——也就是说,它不应该把它们变成“lock-by -default" 调用。
例如阻塞 push_back() 的最简单版本如下所示:
void push_back_Blocking(const T& pkg) {
if (!push_back(pkg)) {
unique_lock<mutex> ul(mux);
while (!push_back(pkg)) {
cv_notFull.wait(ul);
}
}
}
但不幸的是,这还需要将以下块放在 "non-blocking" pop_front()
的末尾:
{
std::lock_guard<mutex> lg(mux);
cv_notFull.notify_all();
}
虽然 notify
本身几乎没有任何性能影响(如果没有线程在等待),但锁有。
所以我的问题是:
我如何(尽可能使用标准 c++14)将阻塞 push_back
和 pop_front
成员函数添加到我的队列中,而不会严重阻碍 non_blocking 对应项的性能(阅读:最小化系统调用)? 至少只要没有线程实际被阻塞——但理想情况下即使如此。
作为参考,我当前的版本看起来与此类似(我省略了调试检查、数据对齐和显式内存排序):
template<class T, size_t N>
class MPSC_queue {
using INDEX_TYPE = unsigned long;
struct Idx {
INDEX_TYPE idx;
INDEX_TYPE version_cnt;
};
enum class SlotState {
EMPTY,
FILLED
};
struct Slot {
Slot() = default;
std::atomic<SlotState> state= SlotState::EMPTY;
T data{};
};
struct Buffer_t {
std::array<Slot, N> data{};
Buffer_t() {
data.fill(Slot{ SlotState::EMPTY, T{} });
}
Slot& operator[](Idx idx) {
return this->operator[](idx.idx);
}
Slot& operator[](INDEX_TYPE idx) {
return data[idx];
}
};
Buffer_t buffer;
std::atomic<Idx> head{};
std::atomic<INDEX_TYPE> tail=0;
INDEX_TYPE next(INDEX_TYPE old) { return (old + 1) % N; }
Idx next(Idx old) {
old.idx = next(old.idx);
old.version_cnt++;
return old;
}
public:
bool push_back(const T& val) {
auto tHead = head.load();
Idx wrtIdx;
do {
wrtIdx = next(tHead);
if (wrtIdx.idx == tail) {
return false;
}
} while (!head.compare_exchange_strong(tHead, wrtIdx));
buffer[wrtIdx].data = val;
buffer[wrtIdx].state = SlotState::FILLED;
return true;
}
bool pop_front(T& val) {
auto rIdx = next(tail);
if (buffer[rIdx].state != SlotState::FILLED) {
return false;
}
val = buffer[rIdx].data;
buffer[rIdx].state = SlotState::EMPTY;
tail = rIdx;
return true;
}
};
相关问题:
我问了一个类似的问题,专门关于优化 condition_variable::notify
here, but the question got closed as a supposedly duplicate of this question 的使用。
我不同意,因为这个问题是关于为什么一般情况下条件变量需要互斥量(或者更确切地说它是 pthread 等价物) - 关注 condition_variable::wait
- 而不是 if/how 它可以避免 notify
部分。但显然我没有说清楚(或者人们只是不同意我的观点)。
无论如何,链接问题中的答案对我没有帮助,因为无论如何这有点 XY-problem,我决定再问一个关于我遇到的实际问题的问题,从而允许更广泛的讨论可能的解决方案范围(也许有一种方法可以完全避免条件变量)。
This question也很相似,但是
- 它是关于 linux 上的 C 的,答案使用特定于平台的 构造(pthreads 和 futexes)
- 那里的作者要求有效的阻塞调用,但根本没有非阻塞调用。另一方面,我不太关心阻塞的效率,但希望尽可能快地保持非阻塞的效率。
如果条件变量上有潜在 服务员,您必须 锁定notify_all
调用的互斥量。
问题是 条件检查 (!push_back(pkg)
) 在 之前 等待 条件变量(C++11 没有提供其他方式)。所以互斥是唯一可以保证这些动作之间一致性的方法。
但是在没有潜在服务员参与的情况下,可以省略锁定(和通知)。只需使用附加标志:
class MPSC_queue {
... // Original definitions
std::atomic<bool> has_waiters;
public:
void push_back_Blocking(const T& pkg) {
if (!push_back(pkg)) {
unique_lock<mutex> ul(mux);
has_waiters.store(true, std::memory_order_relaxed); // #1
while (!push_back(pkg)) { // #2 inside push_back() method
cv_notFull.wait(ul);
// Other waiter may clean flag while we wait. Set it again. Same as #1.
has_waiters.store(true, std::memory_order_relaxed);
}
has_waiters.store(false, std::memory_order_relaxed);
}
}
// Method is same as original, exposed only for #2 mark.
bool push_back(const T& val) {
auto tHead = head.load();
Idx wrtIdx;
do {
wrtIdx = next(tHead);
if (wrtIdx.idx == tail) { // #2
return false;
}
} while (!head.compare_exchange_strong(tHead, wrtIdx));
buffer[wrtIdx].data = val;
buffer[wrtIdx].state = SlotState::FILLED;
return true;
}
bool pop_front(T& val) {
// Main work, same as original pop_front, exposed only for #3 mark.
auto rIdx = next(tail);
if (buffer[rIdx].state != SlotState::FILLED) {
return false;
}
val = buffer[rIdx].data;
buffer[rIdx].state = SlotState::EMPTY;
tail = rIdx; // #3
// Notification part
if(has_waiters.load(std::memory_order_relaxed)) // #4
{
// There are potential waiters. Need to lock.
std::lock_guard<mutex> lg(mux);
cv_notFull.notify_all();
}
return true;
}
};
这里的关键关系是:
- 在
#1
处设置标志并在#2
处读取tail
以检查条件。 - 在
#3
存储tail
并在#4
检查标志。
这两种关系都应该揭示某种普遍秩序。也就是说 #1
应该在 #2
之前被其他线程观察到。 #3
和 #4
.
在那种情况下,可以保证,如果检查标志 #4
发现它没有设置,那么可能进一步的条件检查 #2
将发现条件变化的影响 #3
。所以不加锁(和通知)是安全的,因为没有waiter是不可能的。
在您当前的实现中,#1
和 #2
之间的通用顺序 是通过使用隐式 加载 tail
[提供的]memory_order_seq_cst。 #3
和 #4
之间的相同顺序通过使用隐式 memory_order_seq_cst.
tail
来提供
在那种方法中,"Do not lock if no waiters"、通用顺序是最棘手的部分。在这两种关系中,都是Read After Write顺序,这是memory_order_acquire和的任意组合无法实现的memory_order_release。所以应该用memory_order_seq_cst