实现一个类似于 Qt 的高性能互斥锁

Implement a high performance mutex similar to Qt's one

我有一个多线程科学应用程序,其中多个计算线程(每个内核一个)必须将它们的结果存储在一个公共缓冲区中。这需要互斥机制。

工作线程只花费一小部分时间写入缓冲区,因此互斥锁大部分时间都处于解锁状态,并且锁很有可能立即成功,而无需等待另一个线程解锁。

目前,我已经使用 Qt 的 QMutex 来完成任务,并且效果很好:互斥锁的开销可以忽略不计。

但是,我只能将它移植到 c++11/STL。使用 std::mutex 时,性能下降 66%,线程大部分时间都在锁定互斥体。

经过另一个问题,我认为 Qt 使用基于简单原子标志的快速锁定机制,针对互斥量尚未锁定的情况进行了优化。当并发锁定发生时回退到系统互斥锁。

我想在 STL 中实现它。有没有基于 std::atomic 和 std::mutex 的简单方法?我已经深入研究了 Qt 的代码,但它对我的使用来说似乎过于复杂(我不需要锁超时、pimpl、小占用空间等...)。

编辑:我试过自旋锁,但效果不佳,因为:

另一个线程定期(每隔几秒)锁定互斥锁并刷新缓冲区。这需要一些时间,因此此时所有工作线程都会被阻塞。自旋锁使调度变得繁忙,导致刷新比使用适当的互斥锁慢 10-100 倍。这是不可接受的

编辑:我已经试过了,但它不起作用(锁定所有线程)

class Mutex
{
public:
    Mutex() : lockCounter(0) { }

    void lock()
    {
        if(lockCounter.fetch_add(1, std::memory_order_acquire)>0)
        {
            std::unique_lock<std::mutex> lock(internalMutex);
            cv.wait(lock);
        }
    }

    void unlock();
    {
        if(lockCounter.fetch_sub(1, std::memory_order_release)>1)
        {
            cv.notify_one();
        }
    }


private:
    std::atomic<int> lockCounter;
    std::mutex internalMutex;
    std::condition_variable cv;
};

谢谢!

编辑:最终解决方案

MikeMB 的快速互斥锁运行良好。

作为最终的解决方案,我做了:

一般建议

正如一些评论中提到的,我首先要看一下,您是否可以重构您的程序设计,使互斥实现对您的性能不那么重要。
此外,由于标准 C++ 中的多线程支持非常新并且有些幼稚,因此您有时只需要依靠特定于平台的机制,例如linux 系统上的 futex 或 windows 上的关键部分或 Qt 等非标准库。
话虽如此,我可以想到两种可能会加速您的程序的实现方法:

自旋锁
如果访问冲突很少发生,并且互斥锁只持有很短的时间(当然,无论如何都应该努力实现两件事),那么只使用自旋锁可能是最有效的,因为它不需要任何系统完全调用并且实现起来很简单(取自cppreference):

class SpinLock {
    std::atomic_flag locked ;
public:
    void lock() {
        while (locked.test_and_set(std::memory_order_acquire)) { 
             std::this_thread::yield(); //<- this is not in the source but might improve performance. 
        }
    }
    void unlock() {
        locked.clear(std::memory_order_release);
    }
};

缺点当然是等待线程不会保持睡眠状态并占用处理时间。

检查锁定

这基本上就是您展示的想法:您首先快速检查是否确实需要基于原子交换操作的锁定,只有在不可避免时才使用重 std::mutex

struct FastMux {
    //Status of the fast mutex
    std::atomic<bool> locked;
    //helper mutex and vc on which threads can wait in case of collision
    std::mutex mux;
    std::condition_variable cv;
    //the maximum number of threads that might be waiting on the cv (conservative estimation)
    std::atomic<int> cntr; 

    FastMux():locked(false), cntr(0){}

    void lock() {
        if (locked.exchange(true)) {
            cntr++;
            {
                std::unique_lock<std::mutex> ul(mux);
                cv.wait(ul, [&]{return !locked.exchange(true); });
            }
            cntr--;
        }
    }
    void unlock() {
        locked = false;
        if (cntr > 0){
            std::lock_guard<std::mutex> ul(mux);
            cv.notify_one();
        }
    }
};

请注意,std::mutex 未锁定在 lock()unlock() 之间,但它仅用于处理条件变量。如果互斥体上存在高度拥塞,这会导致更多的锁定/解锁调用。

您的实现的问题是,cv.notify_one(); 可能会在 if(lockCounter.fetch_add(1, std::memory_order_acquire)>0)cv.wait(lock); 之间被调用,因此您的线程可能永远不会唤醒。

虽然我没有针对您提议的实施的固定版本进行任何性能比较,因此您只需要看看最适合您的方法即可。

不是每个定义的真正答案,但根据具体任务,无锁队列可能有助于完全摆脱互斥锁。如果您有多个生产者和一个消费者(甚至多个消费者),这将有助于设计。链接:

更新 评论:

队列大小/溢出:

  • 可以通过以下方式避免队列溢出:i) 使队列足够大或 ii) 让生产者线程在队列满后等待并推送数据。
  • 另一种选择是使用多个消费者和多个队列并实施并行缩减,但这取决于数据的处理方式。

消费者线程:

  • 队列可以使用 std::condition_variable 并让消费者线程等待直到有数据。
  • 另一种选择是使用计时器定期检查(轮询)队列是否为空,一旦队列非空,线程可以连续获取数据并返回等待模式。