锁定条件变量后如何运行一个函数?
How to run a function after locking on the condition variable?
给定 Future
变量 get_value()
方法的以下函数体:
// Optimization once _is_resolved is set true, we do not need lock anymore
if( _is_resolved ) {
return _value;
}
// If _is_resolved is not set to true, lock and double check _is_resolved
_lock.lock();
if(!_is_resolved) {
++_size;
std::unique_lock<std::mutex> lock(_mutex);
// We cannot call _lock.unlock(); before _condition.wait(lock); because
// 1. It would allow this thread to be preempted
// 2. Then, some other thread could call resolve()
// Once this other thread completes the resolve() call, and this
// thread is rescheduled, we would finally call _condition.wait(lock);
// but doing so, would cause this thread to be locked indefinitely
// because we will never call resolve() anymore
_condition.wait(lock); // How to call _lock.unlock(); after locking?
}
else {
_lock.unlock();
}
return _value;
当变量 _is_resolved
设置为 false 时,我无法正确释放 _lock
,因为在调用 _condition.wait()
之后我无法调用 _lock.unlock()
直到 _condition
变量满足。但是,为了满足条件变量,我需要先释放锁。这是悖论。
我能想到的唯一解决方案是让 _condition
变量在最终锁定后调用 _lock.unlock()
。但是怎么办呢?
完整的最小示例:
#include <cassert>
#include <iostream>
#include <chrono>
#include <mutex>
#include <atomic>
#include <thread>
#include <condition_variable>
std::recursive_mutex _debug_syncronized_semaphore_lock;
#define DB(...) do { \
std::unique_lock<std::recursive_mutex> lock(_debug_syncronized_semaphore_lock); \
std::cout << __VA_ARGS__ << std::flush; } while(0);
template<typename FutureType>
class Future
{
public:
Future(): _size(0), _is_resolved(false) {
DB( "Future(_is_resolved=" << _is_resolved
<< ", _condition=" << _size
<< ") => " << this << std::endl )
}
~Future() {
DB( "~Future(this=" << this << ")" << std::endl );
}
FutureType get_value() {
DB( "Future::get_value(this=" << this
<< " _is_resolved=" << _is_resolved
<< " _condition=" << _size
<< ")" << std::endl )
// Optimization once _is_resolved is set true, we do not need lock anymore
if( _is_resolved ) {
return _value;
}
// If _is_resolved is not set to true, lock and double check _is_resolved
_lock.lock();
if(!_is_resolved) {
++_size;
std::unique_lock<std::mutex> lock(_mutex);
// We cannot call _lock.unlock(); before _condition.wait(lock); because
// 1. It would allow this thread to be preempted
// 2. Then, some other thread could call resolve()
// Once this other thread completes the resolve() call, and this
// thread is rescheduled, we would finally call _condition.wait(lock);
// but doing so, would cause this thread to be locked indefinitely
// because we will never call resolve() anymore
_condition.wait(lock); // How to call _lock.unlock(); after locking?
}
else {
_lock.unlock();
}
return _value;
}
void resolve(FutureType value) {
DB( "Future::resolve(this=" << this
<< " _is_resolved=" << _is_resolved
<< " _condition=" << _size
<< ")" << std::endl )
_lock.lock();
assert(!_is_resolved);
// If the instruction pointer got until here, and the thread is unscheduled,
// and another thread call `resolve()`, then, the `assert` will not work,
// if the whole resolve() call is not atomic.
_value = value;
_is_resolved = true;
_lock.unlock();
_condition.notify_all();
}
private:
FutureType _value;
std::atomic<int> _size;
volatile std::atomic<bool> _is_resolved;
std::mutex _mutex;
std::recursive_mutex _lock;
std::condition_variable _condition;
};
int producerFunction(Future<int>* future) {
DB( "producerFunction ()" << std::endl )
std::this_thread::sleep_for( std::chrono::milliseconds(2000) );
future->resolve(10);
DB( "producerFunction (resolving future=" << future << " to 10)" << std::endl )
return 0;
}
int consumerFunction(Future<int>* future) {
DB( "consumerFunction ()" << std::endl )
auto value = future->get_value();
DB( "consumerFunction (result=" << value << ")" << std::endl )
value = future->get_value();
DB( "consumerFunction (result=" << value << ")" << std::endl )
return 0;
}
int main()
{
DB( "Starting main application..." << std::endl )
Future<int>* future = new Future<int>();
std::thread* consumer = new std::thread(&consumerFunction, future);
std::thread* producer = new std::thread(&producerFunction, future);
consumer->join();
producer->join();
DB( "Exiting main application..." << std::endl )
return 0;
}
调用 wait(lock)
释放 lock
。这里的问题是代码有两个个互斥;据我所知,一个应该就足够了。而且它可能不需要递归。这通常是设计问题的标志。
我建议删除 _lock
并坚持使用 _mutex
。然后更改代码,使其看起来像这样:
if (_is_resolved)
return _value;
std::unique_lock<std::mutex> lock(_mutex);
if (!_is_resolved) {
++_size;
_condition.wait(lock);
}
return _value;
给定 Future
变量 get_value()
方法的以下函数体:
// Optimization once _is_resolved is set true, we do not need lock anymore
if( _is_resolved ) {
return _value;
}
// If _is_resolved is not set to true, lock and double check _is_resolved
_lock.lock();
if(!_is_resolved) {
++_size;
std::unique_lock<std::mutex> lock(_mutex);
// We cannot call _lock.unlock(); before _condition.wait(lock); because
// 1. It would allow this thread to be preempted
// 2. Then, some other thread could call resolve()
// Once this other thread completes the resolve() call, and this
// thread is rescheduled, we would finally call _condition.wait(lock);
// but doing so, would cause this thread to be locked indefinitely
// because we will never call resolve() anymore
_condition.wait(lock); // How to call _lock.unlock(); after locking?
}
else {
_lock.unlock();
}
return _value;
当变量 _is_resolved
设置为 false 时,我无法正确释放 _lock
,因为在调用 _condition.wait()
之后我无法调用 _lock.unlock()
直到 _condition
变量满足。但是,为了满足条件变量,我需要先释放锁。这是悖论。
我能想到的唯一解决方案是让 _condition
变量在最终锁定后调用 _lock.unlock()
。但是怎么办呢?
完整的最小示例:
#include <cassert>
#include <iostream>
#include <chrono>
#include <mutex>
#include <atomic>
#include <thread>
#include <condition_variable>
std::recursive_mutex _debug_syncronized_semaphore_lock;
#define DB(...) do { \
std::unique_lock<std::recursive_mutex> lock(_debug_syncronized_semaphore_lock); \
std::cout << __VA_ARGS__ << std::flush; } while(0);
template<typename FutureType>
class Future
{
public:
Future(): _size(0), _is_resolved(false) {
DB( "Future(_is_resolved=" << _is_resolved
<< ", _condition=" << _size
<< ") => " << this << std::endl )
}
~Future() {
DB( "~Future(this=" << this << ")" << std::endl );
}
FutureType get_value() {
DB( "Future::get_value(this=" << this
<< " _is_resolved=" << _is_resolved
<< " _condition=" << _size
<< ")" << std::endl )
// Optimization once _is_resolved is set true, we do not need lock anymore
if( _is_resolved ) {
return _value;
}
// If _is_resolved is not set to true, lock and double check _is_resolved
_lock.lock();
if(!_is_resolved) {
++_size;
std::unique_lock<std::mutex> lock(_mutex);
// We cannot call _lock.unlock(); before _condition.wait(lock); because
// 1. It would allow this thread to be preempted
// 2. Then, some other thread could call resolve()
// Once this other thread completes the resolve() call, and this
// thread is rescheduled, we would finally call _condition.wait(lock);
// but doing so, would cause this thread to be locked indefinitely
// because we will never call resolve() anymore
_condition.wait(lock); // How to call _lock.unlock(); after locking?
}
else {
_lock.unlock();
}
return _value;
}
void resolve(FutureType value) {
DB( "Future::resolve(this=" << this
<< " _is_resolved=" << _is_resolved
<< " _condition=" << _size
<< ")" << std::endl )
_lock.lock();
assert(!_is_resolved);
// If the instruction pointer got until here, and the thread is unscheduled,
// and another thread call `resolve()`, then, the `assert` will not work,
// if the whole resolve() call is not atomic.
_value = value;
_is_resolved = true;
_lock.unlock();
_condition.notify_all();
}
private:
FutureType _value;
std::atomic<int> _size;
volatile std::atomic<bool> _is_resolved;
std::mutex _mutex;
std::recursive_mutex _lock;
std::condition_variable _condition;
};
int producerFunction(Future<int>* future) {
DB( "producerFunction ()" << std::endl )
std::this_thread::sleep_for( std::chrono::milliseconds(2000) );
future->resolve(10);
DB( "producerFunction (resolving future=" << future << " to 10)" << std::endl )
return 0;
}
int consumerFunction(Future<int>* future) {
DB( "consumerFunction ()" << std::endl )
auto value = future->get_value();
DB( "consumerFunction (result=" << value << ")" << std::endl )
value = future->get_value();
DB( "consumerFunction (result=" << value << ")" << std::endl )
return 0;
}
int main()
{
DB( "Starting main application..." << std::endl )
Future<int>* future = new Future<int>();
std::thread* consumer = new std::thread(&consumerFunction, future);
std::thread* producer = new std::thread(&producerFunction, future);
consumer->join();
producer->join();
DB( "Exiting main application..." << std::endl )
return 0;
}
调用 wait(lock)
释放 lock
。这里的问题是代码有两个个互斥;据我所知,一个应该就足够了。而且它可能不需要递归。这通常是设计问题的标志。
我建议删除 _lock
并坚持使用 _mutex
。然后更改代码,使其看起来像这样:
if (_is_resolved)
return _value;
std::unique_lock<std::mutex> lock(_mutex);
if (!_is_resolved) {
++_size;
_condition.wait(lock);
}
return _value;