C++ 像从另一个线程一样锁定互斥量?

C++ Lock a mutex as if from another thread?

我正在编写一个 Audio class,它包含一个 std::thread 用于异步填充一些缓冲区。假设我们调用主线程 A 和后台(class 成员)线程 B。我正在使用 std::mutex 来在声音未播放时阻止线程 B,那样它就不会 运行 在不需要时在后台运行,并且不会使用过多的 CPU 电源。互斥体默认被线程A锁定,所以线程B被阻塞,然后在播放声音时线程A解锁互斥体,线程B运行s(先加锁然后立即解锁)循环。

当线程 B 发现它已到达文件末尾时,问题就出现了。它可以停止播放和清理缓冲区等,但它不能停止自己的循环,因为线程 B 无法锁定线程 A 的互斥量。

相关代码大纲如下:

class Audio {
private:

    // ...

    std::thread Thread;
    std::mutex PauseMutex;    // mutex that blocks Thread, locked in constructor
    void ThreadFunc();     // assigned to Thread in constructor

 public:

    // ...

    void Play();
    void Stop();
}

_

void Audio::ThreadFunc() {

    // ... (include initial check of mutex here)

    while (!this->EndThread) {    // Thread-safe flag, only set when Audio is destructed

            // ... Check and refill buffers as necessary, etc ...

        if (EOF)
             Stop();

        // Attempt a lock, blocks thread if sound/music is not playing
        this->PauseMutex.lock();
        this->PauseMutex.unlock();
    }
}

void Audio::Play() {
     // ...
     PauseMutex.unlock();     // unlock mutex so loop in ThreadFunc can start
}

void Audio::Stop() {
     // ...
     PauseMutex.lock();     // locks mutex to stop loop in ThreadFunc
     // ^^ This is the issue here
}

在上面的设置中,当后台线程发现它到达 EOF 时,它会调用 class 的 Stop() 函数,据说该函数会锁定互斥锁以停止后台线程。这是行不通的,因为互斥量必须由主线程锁定,而不是后台线程(在这个例子中,它在 ThreadFunc 中崩溃,因为后台线程在已经锁定后试图在其主循环中锁定Stop()).

在这一点上,我唯一能想到的就是以某种方式让后台线程锁定互斥量 ,就好像 它是主线程一样,让主线程拥有互斥量...如果可能的话?有没有办法让线程将互斥体的所有权转移给另一个线程?或者这是我创建的设置中的设计缺陷? (如果是后者,是否有任何合理的解决方法?)class 中的其他所有内容目前都按设计工作。

我什至不会假装理解你的代码是如何尝试做它正在做的事情的。然而,有一件事是显而易见的。您正在尝试使用互斥锁来传达某些谓词状态更改,这是在该高速公路上行驶的错误车辆。

谓词状态变化是通过耦合三件事来处理的:

  • 一些谓词数据
  • 保护谓词的互斥量
  • 一个条件变量,用于传达谓词状态可能的变化。

目标

以下示例的目的是演示在控制跨多个线程的程序流时如何协同使用互斥体、条件变量和谓词数据。它显示了使用 waitwait_for 条件变量功能的示例,以及将成员函数 运行 作为线程过程的一种方法。


以下是一个简单的 Player class 在四种可能状态之间切换:

  • Stopped : 播放器没有播放,没有暂停,也没有退出。
  • Playing : 播放器正在播放
  • Paused : 播放器暂停,一旦恢复播放将从中断处继续。
  • 退出 : 播放器应该停止正在做的事情并终止。

谓词数据相当明显。 state 成员。它必须受到保护,这意味着除非在互斥锁的保护下,否则不能更改或检查。我在其中添加了一个 counter ,它在保持播放状态一段时间的过程中简单地增加。更具体地说:

  • 播放时,每 200 毫秒 counter 递增,然后将一些数据转储到控制台。
  • 暂停时,counter 没有改变,但在播放时保留其最后一个值。这意味着当恢复时它将从停止的地方继续。
  • 停止时,counter 重置为零,并且换行符被注入到控制台输出中。这意味着切换回播放将重新开始计数器序列。
  • 设置退出状态对 counter 没有影响,它将与其他所有内容一起消失。

代码

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <unistd.h>

using namespace std::chrono_literals;

struct Player
{
private:
    std::mutex mtx;
    std::condition_variable cv;
    std::thread thr;

    enum State
    {
        Stopped,
        Paused,
        Playing,
        Quit
    };

    State state;
    int counter;

    void signal_state(State st)
    {
        std::unique_lock<std::mutex> lock(mtx);
        if (st != state)
        {
            state = st;
            cv.notify_one();
        }
    }

    // main player monitor
    void monitor()
    {
        std::unique_lock<std::mutex> lock(mtx);
        bool bQuit = false;

        while (!bQuit)
        {
            switch (state)
            {
                case Playing:
                    std::cout << ++counter << '.';
                    cv.wait_for(lock, 200ms, [this](){ return state != Playing; });
                    break;

                case Stopped:
                    cv.wait(lock, [this]() { return state != Stopped; });
                    std::cout << '\n';
                    counter = 0;
                    break;

                case Paused:
                    cv.wait(lock, [this]() { return state != Paused; });
                    break;

                case Quit:
                    bQuit = true;
                    break;
            }
        }
    }

public:
    Player()
        : state(Stopped)
        , counter(0)
    {
        thr = std::thread(std::bind(&Player::monitor, this));
    }

    ~Player()
    {
        quit();
        thr.join();
    }

    void stop() { signal_state(Stopped); }
    void play() { signal_state(Playing); }
    void pause() { signal_state(Paused); }
    void quit() { signal_state(Quit); }
};

int main()
{
    Player player;
    player.play();
    sleep(3);
    player.pause();
    sleep(3);
    player.play();
    sleep(3);
    player.stop();
    sleep(3);
    player.play();
    sleep(3);
}

输出

我真的无法证明这一点。您必须 运行 它并了解它是如何工作的,我邀请您像我上面那样玩弄 main() 中的状态。但是请注意,一旦 quit 被调用,其他状态的 none 将被监控。设置退出状态将关闭监视器线程。对于它的价值,上面的 运行 应该看起来像这样:

1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.

将第一组数字分成两组(1..15,然后是 16..30),结果是播放,然后暂停,然后再次播放。然后发出停止信号,然后再播放约 3 秒。之后,对象自毁,并在这样做时设置退出状态,并等待监视器终止。

总结

希望你能从中有所收获。如果您发现自己试图通过手动锁定和释放互斥锁来管理谓词状态,那么您需要一个条件变量设计模式来帮助检测这些变化。

希望你能从中有所收获。

class CtLockCS
{
public:
    //--------------------------------------------------------------------------
    CtLockCS()      { ::InitializeCriticalSection(&m_cs); }
    //--------------------------------------------------------------------------
    ~CtLockCS()     { ::DeleteCriticalSection(&m_cs); }
    //--------------------------------------------------------------------------
    bool TryLock()  { return ::TryEnterCriticalSection(&m_cs) == TRUE; }
    //--------------------------------------------------------------------------
    void Lock()     { ::EnterCriticalSection(&m_cs); }
    //--------------------------------------------------------------------------
    void Unlock()   { ::LeaveCriticalSection(&m_cs); }
    //--------------------------------------------------------------------------
protected:
    CRITICAL_SECTION m_cs;
};


///////////////////////////////////////////////////////////////////////////////
// class CtLockMX - using mutex

class CtLockMX
{
public:
    //--------------------------------------------------------------------------
    CtLockMX(const TCHAR* nameMutex = 0)
        { m_mx = ::CreateMutex(0, FALSE, nameMutex); }
    //--------------------------------------------------------------------------
    ~CtLockMX()
        { if (m_mx) { ::CloseHandle(m_mx); m_mx = NULL; } }
    //--------------------------------------------------------------------------
    bool TryLock()
        {   return m_mx ? (::WaitForSingleObject(m_mx, 0) == WAIT_OBJECT_0) : false; }
    //--------------------------------------------------------------------------
    void Lock()
        {   if (m_mx)   { ::WaitForSingleObject(m_mx, INFINITE); }  }
    //--------------------------------------------------------------------------
    void Unlock()
        {   if (m_mx)   { ::ReleaseMutex(m_mx); }   }
    //--------------------------------------------------------------------------
protected:
    HANDLE  m_mx;
};


///////////////////////////////////////////////////////////////////////////////
// class CtLockSM - using semaphore

class CtLockSM
{
public:
    //--------------------------------------------------------------------------
    CtLockSM(int maxcnt)    { m_sm = ::CreateSemaphore(0, maxcnt, maxcnt, 0); }
    //--------------------------------------------------------------------------
    ~CtLockSM()             { ::CloseHandle(m_sm); }
    //--------------------------------------------------------------------------
    bool TryLock()          { return m_sm ? (::WaitForSingleObject(m_sm, 0) == WAIT_OBJECT_0) : false;  }
    //--------------------------------------------------------------------------
    void Lock()             { if (m_sm) { ::WaitForSingleObject(m_sm, INFINITE); }  }
    //--------------------------------------------------------------------------
    void Unlock()
    {
        if (m_sm){
            LONG prevcnt = 0;
            ::ReleaseSemaphore(m_sm, 1, &prevcnt);
        }
    }
    //--------------------------------------------------------------------------
protected:
    HANDLE  m_sm;
};