锁定条件变量后如何运行一个函数?

How to run a function after locking on the condition variable?

给定 Future 变量 get_value() 方法的以下函数体:

// Optimization once _is_resolved is set true, we do not need lock anymore
if( _is_resolved ) {
    return _value;
}

// If _is_resolved is not set to true, lock and double check _is_resolved
_lock.lock();
if(!_is_resolved) {
    ++_size;
    std::unique_lock<std::mutex> lock(_mutex);

    // We cannot call _lock.unlock(); before _condition.wait(lock); because
    // 1. It would allow this thread to be preempted
    // 2. Then, some other thread could call resolve()
    // Once this other thread completes the resolve() call, and this
    // thread is rescheduled, we would finally call _condition.wait(lock);
    // but doing so, would cause this thread to be locked indefinitely
    // because we will never call resolve() anymore
    _condition.wait(lock); // How to call _lock.unlock(); after locking?
}
else {
    _lock.unlock();
}
return _value;

当变量 _is_resolved 设置为 false 时,我无法正确释放 _lock,因为在调用 _condition.wait() 之后我无法调用 _lock.unlock() 直到 _condition变量满足。但是,为了满足条件变量,我需要先释放锁。这是悖论。

我能想到的唯一解决方案是让 _condition 变量在最终锁定后调用 _lock.unlock()。但是怎么办呢?

完整的最小示例:

#include <cassert>
#include <iostream>
#include <chrono>

#include <mutex>
#include <atomic>
#include <thread>
#include <condition_variable>

std::recursive_mutex _debug_syncronized_semaphore_lock;

#define DB(...) do { \
        std::unique_lock<std::recursive_mutex> lock(_debug_syncronized_semaphore_lock); \
        std::cout << __VA_ARGS__ << std::flush; } while(0);

template<typename FutureType>
class Future
{
public:
    Future(): _size(0), _is_resolved(false) {
        DB( "Future(_is_resolved=" << _is_resolved
                << ", _condition=" << _size
                << ") => " << this << std::endl )
    }

    ~Future() {
        DB( "~Future(this=" << this << ")" << std::endl );
    }

    FutureType get_value() {
        DB( "Future::get_value(this=" << this
                << " _is_resolved=" << _is_resolved
                << " _condition=" << _size
                <<  ")" << std::endl )
        // Optimization once _is_resolved is set true, we do not need lock anymore
        if( _is_resolved ) {
            return _value;
        }

        // If _is_resolved is not set to true, lock and double check _is_resolved
        _lock.lock();
        if(!_is_resolved) {
            ++_size;
            std::unique_lock<std::mutex> lock(_mutex);

            // We cannot call _lock.unlock(); before _condition.wait(lock); because
            // 1. It would allow this thread to be preempted
            // 2. Then, some other thread could call resolve()
            // Once this other thread completes the resolve() call, and this
            // thread is rescheduled, we would finally call _condition.wait(lock);
            // but doing so, would cause this thread to be locked indefinitely
            // because we will never call resolve() anymore
            _condition.wait(lock); // How to call _lock.unlock(); after locking?
        }
        else {
            _lock.unlock();
        }
        return _value;
    }

    void resolve(FutureType value) {
        DB( "Future::resolve(this=" << this
                << " _is_resolved=" << _is_resolved
                << " _condition=" << _size
                <<  ")" << std::endl )
        _lock.lock();
        assert(!_is_resolved);
        // If the instruction pointer got until here, and the thread is unscheduled,
        // and another thread call `resolve()`, then, the `assert` will not work,
        // if the whole resolve() call is not atomic.
        _value = value;
        _is_resolved = true;
        _lock.unlock();
        _condition.notify_all();
    }

private:
    FutureType _value;
    std::atomic<int> _size;
    volatile std::atomic<bool> _is_resolved;

    std::mutex _mutex;
    std::recursive_mutex _lock;
    std::condition_variable _condition;
};


int producerFunction(Future<int>* future) {
    DB( "producerFunction ()" << std::endl )

    std::this_thread::sleep_for( std::chrono::milliseconds(2000) );
    future->resolve(10);

    DB( "producerFunction (resolving future=" << future << " to 10)" << std::endl )
    return 0;
}

int consumerFunction(Future<int>* future) {
    DB( "consumerFunction ()" << std::endl )

    auto value = future->get_value();
    DB( "consumerFunction (result=" << value << ")" << std::endl )

    value = future->get_value();
    DB( "consumerFunction (result=" << value << ")" << std::endl )
    return 0;
}

int main()
{
    DB( "Starting main application..." << std::endl )
    Future<int>* future = new Future<int>();

    std::thread* consumer = new std::thread(&consumerFunction, future);
    std::thread* producer = new std::thread(&producerFunction, future);

    consumer->join();
    producer->join();

    DB( "Exiting main application..." << std::endl )
    return 0;
}

调用 wait(lock) 释放 lock。这里的问题是代码有两个个互斥;据我所知,一个应该就足够了。而且它可能不需要递归。这通常是设计问题的标志。

我建议删除 _lock 并坚持使用 _mutex。然后更改代码,使其看起来像这样:

if (_is_resolved)
    return _value;
std::unique_lock<std::mutex> lock(_mutex);
if (!_is_resolved) {
    ++_size;
    _condition.wait(lock);
}
return _value;