实现一个类似于 Qt 的高性能互斥锁
Implement a high performance mutex similar to Qt's one
我有一个多线程科学应用程序,其中多个计算线程(每个内核一个)必须将它们的结果存储在一个公共缓冲区中。这需要互斥机制。
工作线程只花费一小部分时间写入缓冲区,因此互斥锁大部分时间都处于解锁状态,并且锁很有可能立即成功,而无需等待另一个线程解锁。
目前,我已经使用 Qt 的 QMutex 来完成任务,并且效果很好:互斥锁的开销可以忽略不计。
但是,我只能将它移植到 c++11/STL。使用 std::mutex 时,性能下降 66%,线程大部分时间都在锁定互斥体。
经过另一个问题,我认为 Qt 使用基于简单原子标志的快速锁定机制,针对互斥量尚未锁定的情况进行了优化。当并发锁定发生时回退到系统互斥锁。
我想在 STL 中实现它。有没有基于 std::atomic 和 std::mutex 的简单方法?我已经深入研究了 Qt 的代码,但它对我的使用来说似乎过于复杂(我不需要锁超时、pimpl、小占用空间等...)。
编辑:我试过自旋锁,但效果不佳,因为:
另一个线程定期(每隔几秒)锁定互斥锁并刷新缓冲区。这需要一些时间,因此此时所有工作线程都会被阻塞。自旋锁使调度变得繁忙,导致刷新比使用适当的互斥锁慢 10-100 倍。这是不可接受的
编辑:我已经试过了,但它不起作用(锁定所有线程)
class Mutex
{
public:
Mutex() : lockCounter(0) { }
void lock()
{
if(lockCounter.fetch_add(1, std::memory_order_acquire)>0)
{
std::unique_lock<std::mutex> lock(internalMutex);
cv.wait(lock);
}
}
void unlock();
{
if(lockCounter.fetch_sub(1, std::memory_order_release)>1)
{
cv.notify_one();
}
}
private:
std::atomic<int> lockCounter;
std::mutex internalMutex;
std::condition_variable cv;
};
谢谢!
编辑:最终解决方案
MikeMB 的快速互斥锁运行良好。
作为最终的解决方案,我做了:
- 使用带有 try_lock
的简单自旋锁
- 当一个线程try_lock失败时,它们不会等待,而是填充一个队列(不与其他线程共享)并继续
- 当线程获得锁时,它会用当前结果更新缓冲区,但也会用存储在队列中的结果(它处理它的队列)
- 缓冲区刷新效率更高:阻塞部分仅交换两个指针。
一般建议
正如一些评论中提到的,我首先要看一下,您是否可以重构您的程序设计,使互斥实现对您的性能不那么重要。
此外,由于标准 C++ 中的多线程支持非常新并且有些幼稚,因此您有时只需要依靠特定于平台的机制,例如linux 系统上的 futex
或 windows 上的关键部分或 Qt 等非标准库。
话虽如此,我可以想到两种可能会加速您的程序的实现方法:
自旋锁
如果访问冲突很少发生,并且互斥锁只持有很短的时间(当然,无论如何都应该努力实现两件事),那么只使用自旋锁可能是最有效的,因为它不需要任何系统完全调用并且实现起来很简单(取自cppreference):
class SpinLock {
std::atomic_flag locked ;
public:
void lock() {
while (locked.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield(); //<- this is not in the source but might improve performance.
}
}
void unlock() {
locked.clear(std::memory_order_release);
}
};
缺点当然是等待线程不会保持睡眠状态并占用处理时间。
检查锁定
这基本上就是您展示的想法:您首先快速检查是否确实需要基于原子交换操作的锁定,只有在不可避免时才使用重 std::mutex
。
struct FastMux {
//Status of the fast mutex
std::atomic<bool> locked;
//helper mutex and vc on which threads can wait in case of collision
std::mutex mux;
std::condition_variable cv;
//the maximum number of threads that might be waiting on the cv (conservative estimation)
std::atomic<int> cntr;
FastMux():locked(false), cntr(0){}
void lock() {
if (locked.exchange(true)) {
cntr++;
{
std::unique_lock<std::mutex> ul(mux);
cv.wait(ul, [&]{return !locked.exchange(true); });
}
cntr--;
}
}
void unlock() {
locked = false;
if (cntr > 0){
std::lock_guard<std::mutex> ul(mux);
cv.notify_one();
}
}
};
请注意,std::mutex
未锁定在 lock()
和 unlock()
之间,但它仅用于处理条件变量。如果互斥体上存在高度拥塞,这会导致更多的锁定/解锁调用。
您的实现的问题是,cv.notify_one();
可能会在 if(lockCounter.fetch_add(1, std::memory_order_acquire)>0)
和 cv.wait(lock);
之间被调用,因此您的线程可能永远不会唤醒。
虽然我没有针对您提议的实施的固定版本进行任何性能比较,因此您只需要看看最适合您的方法即可。
不是每个定义的真正答案,但根据具体任务,无锁队列可能有助于完全摆脱互斥锁。如果您有多个生产者和一个消费者(甚至多个消费者),这将有助于设计。链接:
- 虽然不是直接C++/STL,Boost.Lockfree提供了这样一个队列。
- 另一种选择是 Anthony Williams "C++ Concurrency in Action" 中的无锁队列实现。
- A Fast Lock-Free Queue for C++
更新 评论:
队列大小/溢出:
- 可以通过以下方式避免队列溢出:i) 使队列足够大或 ii) 让生产者线程在队列满后等待并推送数据。
- 另一种选择是使用多个消费者和多个队列并实施并行缩减,但这取决于数据的处理方式。
消费者线程:
- 队列可以使用
std::condition_variable
并让消费者线程等待直到有数据。
- 另一种选择是使用计时器定期检查(轮询)队列是否为空,一旦队列非空,线程可以连续获取数据并返回等待模式。
我有一个多线程科学应用程序,其中多个计算线程(每个内核一个)必须将它们的结果存储在一个公共缓冲区中。这需要互斥机制。
工作线程只花费一小部分时间写入缓冲区,因此互斥锁大部分时间都处于解锁状态,并且锁很有可能立即成功,而无需等待另一个线程解锁。
目前,我已经使用 Qt 的 QMutex 来完成任务,并且效果很好:互斥锁的开销可以忽略不计。
但是,我只能将它移植到 c++11/STL。使用 std::mutex 时,性能下降 66%,线程大部分时间都在锁定互斥体。
经过另一个问题,我认为 Qt 使用基于简单原子标志的快速锁定机制,针对互斥量尚未锁定的情况进行了优化。当并发锁定发生时回退到系统互斥锁。
我想在 STL 中实现它。有没有基于 std::atomic 和 std::mutex 的简单方法?我已经深入研究了 Qt 的代码,但它对我的使用来说似乎过于复杂(我不需要锁超时、pimpl、小占用空间等...)。
编辑:我试过自旋锁,但效果不佳,因为:
另一个线程定期(每隔几秒)锁定互斥锁并刷新缓冲区。这需要一些时间,因此此时所有工作线程都会被阻塞。自旋锁使调度变得繁忙,导致刷新比使用适当的互斥锁慢 10-100 倍。这是不可接受的
编辑:我已经试过了,但它不起作用(锁定所有线程)
class Mutex
{
public:
Mutex() : lockCounter(0) { }
void lock()
{
if(lockCounter.fetch_add(1, std::memory_order_acquire)>0)
{
std::unique_lock<std::mutex> lock(internalMutex);
cv.wait(lock);
}
}
void unlock();
{
if(lockCounter.fetch_sub(1, std::memory_order_release)>1)
{
cv.notify_one();
}
}
private:
std::atomic<int> lockCounter;
std::mutex internalMutex;
std::condition_variable cv;
};
谢谢!
编辑:最终解决方案
MikeMB 的快速互斥锁运行良好。
作为最终的解决方案,我做了:
- 使用带有 try_lock 的简单自旋锁
- 当一个线程try_lock失败时,它们不会等待,而是填充一个队列(不与其他线程共享)并继续
- 当线程获得锁时,它会用当前结果更新缓冲区,但也会用存储在队列中的结果(它处理它的队列)
- 缓冲区刷新效率更高:阻塞部分仅交换两个指针。
一般建议
正如一些评论中提到的,我首先要看一下,您是否可以重构您的程序设计,使互斥实现对您的性能不那么重要。
此外,由于标准 C++ 中的多线程支持非常新并且有些幼稚,因此您有时只需要依靠特定于平台的机制,例如linux 系统上的 futex
或 windows 上的关键部分或 Qt 等非标准库。
话虽如此,我可以想到两种可能会加速您的程序的实现方法:
自旋锁
如果访问冲突很少发生,并且互斥锁只持有很短的时间(当然,无论如何都应该努力实现两件事),那么只使用自旋锁可能是最有效的,因为它不需要任何系统完全调用并且实现起来很简单(取自cppreference):
class SpinLock {
std::atomic_flag locked ;
public:
void lock() {
while (locked.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield(); //<- this is not in the source but might improve performance.
}
}
void unlock() {
locked.clear(std::memory_order_release);
}
};
缺点当然是等待线程不会保持睡眠状态并占用处理时间。
检查锁定
这基本上就是您展示的想法:您首先快速检查是否确实需要基于原子交换操作的锁定,只有在不可避免时才使用重 std::mutex
。
struct FastMux {
//Status of the fast mutex
std::atomic<bool> locked;
//helper mutex and vc on which threads can wait in case of collision
std::mutex mux;
std::condition_variable cv;
//the maximum number of threads that might be waiting on the cv (conservative estimation)
std::atomic<int> cntr;
FastMux():locked(false), cntr(0){}
void lock() {
if (locked.exchange(true)) {
cntr++;
{
std::unique_lock<std::mutex> ul(mux);
cv.wait(ul, [&]{return !locked.exchange(true); });
}
cntr--;
}
}
void unlock() {
locked = false;
if (cntr > 0){
std::lock_guard<std::mutex> ul(mux);
cv.notify_one();
}
}
};
请注意,std::mutex
未锁定在 lock()
和 unlock()
之间,但它仅用于处理条件变量。如果互斥体上存在高度拥塞,这会导致更多的锁定/解锁调用。
您的实现的问题是,cv.notify_one();
可能会在 if(lockCounter.fetch_add(1, std::memory_order_acquire)>0)
和 cv.wait(lock);
之间被调用,因此您的线程可能永远不会唤醒。
虽然我没有针对您提议的实施的固定版本进行任何性能比较,因此您只需要看看最适合您的方法即可。
不是每个定义的真正答案,但根据具体任务,无锁队列可能有助于完全摆脱互斥锁。如果您有多个生产者和一个消费者(甚至多个消费者),这将有助于设计。链接:
- 虽然不是直接C++/STL,Boost.Lockfree提供了这样一个队列。
- 另一种选择是 Anthony Williams "C++ Concurrency in Action" 中的无锁队列实现。
- A Fast Lock-Free Queue for C++
更新 评论:
队列大小/溢出:
- 可以通过以下方式避免队列溢出:i) 使队列足够大或 ii) 让生产者线程在队列满后等待并推送数据。
- 另一种选择是使用多个消费者和多个队列并实施并行缩减,但这取决于数据的处理方式。
消费者线程:
- 队列可以使用
std::condition_variable
并让消费者线程等待直到有数据。 - 另一种选择是使用计时器定期检查(轮询)队列是否为空,一旦队列非空,线程可以连续获取数据并返回等待模式。