无法将元素添加到共享指针的线程安全锁定队列
unable to add elements to thread safe locking queue of shared pointers
我正在尝试使用 C++11 并发技术创建基于线程间消息的通信。 Anthony William 的书 'Concurrency in Action' 描述了此实现所基于的线程安全锁定队列。书中描述的线程安全锁定队列与我想要实现的队列之间的区别首先是我使用通用引用将队列元素转发到阻塞队列,其次(这就是问题所在)我需要能够存储一个 std::shared_ptr 指针队列,因为模板类型由一个简单的消息 class 层次结构和一个抽象基础 class 和子 classes 组成实际的专门消息。我需要使用共享指针来避免数据切片。
编辑:我添加了一个 coliru 演示来更清楚地显示我的问题。
Live Demo
编辑 1:对 coliru 实时演示的更多更新以及额外的编译器错误:
Coliru Demo With Compiler Errors
编辑 2:感谢亚历杭德罗,我有了一个可行的解决方案 Working Coliru Example
为此我改变了Anthony William对底层消息队列的实现:
std::queue<T> data_queue
到
std::queue<std::shared_ptr<T>> data_queue
但是当我尝试通过通用引用完美转发签名将消息指针推送到队列时,我遇到了各种错误。
我希望能够在此队列上添加消息的方式如下:
UtlThreadSafeQueue<BaseMessageType>& mDataLoadSessionQ;
auto message = std::make_shared<DerivedType>(1,2,3);
mDataLoadSessionQ.push(BaseType);
With the above code, the compiler complains indicating something along
the following lines error C2664: 'void
UtlThreadSafeQueue::push(T &&)' : cannot convert
argument 1 from 'std::shared_ptr' to
'BaseMessageType &&' with T=BaseMessageType
我想我需要一些方法来专门化指针类型,但我不确定。
我的实现如下:
/*
** code adapted from Anthony Williams's book C++ Concurrency in Action
** Pages 74-75.
**
*/
#ifndef _utlThreadSafeQueue_h_
#define _utlThreadSafeQueue_h_
// SYSTEM INCLUDES
#include <atomic>
#include <queue>
#include <limits>
#include <memory>
#include <mutex>
#include <condition_variable>
// APPLICATION INCLUDES
// MACROS
#if defined (_WIN32) && (defined (max) || defined (min))
// Windows uses min/max macros
#undef min
#undef max
#endif
// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS
template<typename T>
class UtlThreadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> data_queue;
std::condition_variable data_cond;
std::size_t capacity;
std::atomic<bool> shutdownFlag;
public:
explicit UtlThreadSafeQueue(const size_t& rCapacity =
std::numeric_limits<std::size_t>::max())
: mut()
, data_queue()
, data_cond()
, capacity(rCapacity)
, shutdownFlag(false)
{}
UtlThreadSafeQueue(UtlThreadSafeQueue const& rhs) {
std::lock_guard<std::mutex> lock(rhs.mut);
data_queue = rhs.data_queue;
}
virtual ~UtlThreadSafeQueue() = default;
// move aware push
inline void push(T&& value) {
std::unique_lock<std::mutex> lock(mut);
// only add the value on the stack if there is room
data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
data_queue.emplace(std::forward<T>(value));
data_cond.notify_one();
}
// wait for non empty lambda condition before returning value
inline void wait_and_pop(T& rValue) {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty();});
// ideally should return an invalid value
if (!shutdownFlag) {
rValue = data_queue.front();
data_queue.pop();
}
}
// wait for non empty lambda condition before returning shared pointer to value
inline std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty() || shutdownFlag;});
if (shutdownFlag) {
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
return nullptr;
}
// return value in specified reference and flag indicating whether value
// successfully returned or not
inline bool try_pop(T& rValue) {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return false;
}
rValue = data_queue.front();
data_queue.pop();
return true;
}
// return shared pointer to value - which if set to nullptr,
// indicates container was empty at the time of the call.
inline std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
// thread safe method to check if the queue is empty
// note that if it is empty
inline bool empty() const {
std::lock_guard<std::mutex> lock(mut);
return data_queue.empty();
}
// shutdown support - wake up potentially sleeping queues
inline void shutdown() {
shutdownFlag = true;
data_cond.notify_all();
}
};
#endif // _utlThreadSafeQueue_h_
经过评论中的扩展讨论,并根据 Coliru 链接,我想我理解了您最初尝试做的事情,我也想对您的数据结构提出一些建议。
您提到您的 push()
函数是移动感知的。出色的!但是,小心点。
如果您查看 push
函数的定义方式,
inline void push(T&& value)
我想在这里指出几件事。首先是这只会绑定到右值引用和 而不是 通用引用(或者 forwarding references ,因为它们很快就会被称为 )。您在 push
中使用 std::forward
是不恰当的做法(尽管在技术上是正确的)。原因是类型 T
已经在 class 级别推导出来(当您实例化 UtlThreadSafeQueue
时)。要获得完美的转发语义,您需要 push
如下所示:
template<typename U>
inline void push(U&& value) { ... }
此版本的 push
接受任何类型的引用,正如您所期望的那样。但是,它的用途是 将 沿着任何参数转发到适当的 constructor/function/etc。由于您希望维护一个内部 std::queue<std::shared_ptr<BaseMessage>>
,您可以有一个 push
接受对 BaseMessage
的派生类型的引用(左值或右值),并放置一个 std::shared_ptr<DerivedType>
进入队列。这将建立指针到基的关系(std::shared_ptr<BaseMessage> base_ptr = derived_ptr
,其中 derived_ptr
是 std::shared_ptr<DerivedMessage>
类型)。这可以通过以下方式完成:
template<typename U>
inline
std::enable_if_t<std::is_base_of<T,std::decay_t<U>>::value> push(U&& value)
{
std::unique_lock<std::mutex> lock(mut);
// only add the value on the stack if there is room
data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
data_queue.emplace(std::make_shared<std::decay_t<U>> (std::forward<U>(value)));
data_cond.notify_one();
}
std::enable_if_t
的使用确保只有从 BaseMessage
派生的类型被传递到 push
函数。 std::make_shared<std::decay_t<U>> (std::forward<U>(value))
队列中的位置将调用 std::shared_ptr
的第 9 个构造函数(如 Here 所列)。
这样做的好处在于,它允许您和您的用户编写如下代码:
UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ(10);
StringMessage sm("Hi there!");
IntegerMessage im(4242);
dataLoadSessionQ.push(sm);
dataLoadSessionQ.push(im);
它会按预期运行。每个 Message 都由 lvalue-ref 传入,并且 shared_ptr
通过调用派生类型的复制构造函数来创建。
你公开了一个 push
接口,它不仅接受 std::shared_ptr
,而且接受 std::shared_ptr&&
,它有一些微妙之处。
乍一看,似乎我无法做到这一点(从您的 Coliru 链接中借用 StringMessage
和 BaseMessage
类型):
UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
auto my_message = std::make_shared<StringMessage>("Another message!");
dataLoadSessionQ.push(my_message);
尽管 push
被定义为对 shared_ptr
进行右值引用,但此代码通过传递 my_message
进行编译(这不是右值参考!)。起初我并不清楚原因。但是,事实证明,类似于 static_cast
,有一个为 shared_ptr
定义的 static_pointer_cast
,它看起来像下面这样(从 Here 借来的):
template< class T, class U >
std::shared_ptr<T> static_pointer_cast( const std::shared_ptr<U>& r );
如果转换成功,它将执行从 shared_ptr<U>
到 shared_ptr<T>
的转换。因为您的 Coliru 示例在内部使用 std::queue<std::shared_ptr<BaseMessage>>
,而您正试图推送 shared_ptr<StringMessage>
,隐式 到 shared_ptr<BaseMessage>
的转换成功,因为 StringMessage
继承自 BaseMessage
。转换 return 是一个别名构造的 std::shared_ptr<BaseMessage>
,它将愉快地绑定到右值引用。
请注意,如果您尝试这样做:
UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
auto generic_message = std::make_shared<BaseMessage>();
dataLoadSessionQ.push(generic_message);
您得到了我们(或者可能只是我)最初期待的编译器错误
error: cannot bind 'std::shared_ptr' lvalue to 'std::shared_ptr&&'
dataLoadSessionQ.push(generic_message);
老实说,无论是从性能还是美学方面,我都找不到必须将 shared_ptr<Derived>
传递给 UtlThreadSafeQueue<Base>
的充分理由。我希望能够传入 both 一个临时的 Derived
和一个左值,而不用担心队列的内部结构很多。
您还可以通过 std::move
ing 来自 data_queue.front()
的值来利用 wait_and_pop()
/try_pop()
that return shared_ptr
s无论如何在下一次调用 data_queue.pop()
)
时被破坏
在您的 UtlThreadSafeQueue
构造函数中,我还会考虑将 const std::size_t&
更改为按值 std::size_t
并且对于(示例)IntegerMessage
类型也是如此。
考虑到这一点,对于我上面强调的更改的任何反馈,我将不胜感激 - 坦率地说,直到很久以后,当您发布更多示例并继续编辑问题时,我才能理解您的目标/实施。
我正在尝试使用 C++11 并发技术创建基于线程间消息的通信。 Anthony William 的书 'Concurrency in Action' 描述了此实现所基于的线程安全锁定队列。书中描述的线程安全锁定队列与我想要实现的队列之间的区别首先是我使用通用引用将队列元素转发到阻塞队列,其次(这就是问题所在)我需要能够存储一个 std::shared_ptr 指针队列,因为模板类型由一个简单的消息 class 层次结构和一个抽象基础 class 和子 classes 组成实际的专门消息。我需要使用共享指针来避免数据切片。
编辑:我添加了一个 coliru 演示来更清楚地显示我的问题。 Live Demo
编辑 1:对 coliru 实时演示的更多更新以及额外的编译器错误: Coliru Demo With Compiler Errors
编辑 2:感谢亚历杭德罗,我有了一个可行的解决方案 Working Coliru Example
为此我改变了Anthony William对底层消息队列的实现:
std::queue<T> data_queue
到
std::queue<std::shared_ptr<T>> data_queue
但是当我尝试通过通用引用完美转发签名将消息指针推送到队列时,我遇到了各种错误。
我希望能够在此队列上添加消息的方式如下:
UtlThreadSafeQueue<BaseMessageType>& mDataLoadSessionQ;
auto message = std::make_shared<DerivedType>(1,2,3);
mDataLoadSessionQ.push(BaseType);
With the above code, the compiler complains indicating something along the following lines error C2664: 'void UtlThreadSafeQueue::push(T &&)' : cannot convert argument 1 from 'std::shared_ptr' to 'BaseMessageType &&' with T=BaseMessageType
我想我需要一些方法来专门化指针类型,但我不确定。
我的实现如下:
/*
** code adapted from Anthony Williams's book C++ Concurrency in Action
** Pages 74-75.
**
*/
#ifndef _utlThreadSafeQueue_h_
#define _utlThreadSafeQueue_h_
// SYSTEM INCLUDES
#include <atomic>
#include <queue>
#include <limits>
#include <memory>
#include <mutex>
#include <condition_variable>
// APPLICATION INCLUDES
// MACROS
#if defined (_WIN32) && (defined (max) || defined (min))
// Windows uses min/max macros
#undef min
#undef max
#endif
// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS
template<typename T>
class UtlThreadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> data_queue;
std::condition_variable data_cond;
std::size_t capacity;
std::atomic<bool> shutdownFlag;
public:
explicit UtlThreadSafeQueue(const size_t& rCapacity =
std::numeric_limits<std::size_t>::max())
: mut()
, data_queue()
, data_cond()
, capacity(rCapacity)
, shutdownFlag(false)
{}
UtlThreadSafeQueue(UtlThreadSafeQueue const& rhs) {
std::lock_guard<std::mutex> lock(rhs.mut);
data_queue = rhs.data_queue;
}
virtual ~UtlThreadSafeQueue() = default;
// move aware push
inline void push(T&& value) {
std::unique_lock<std::mutex> lock(mut);
// only add the value on the stack if there is room
data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
data_queue.emplace(std::forward<T>(value));
data_cond.notify_one();
}
// wait for non empty lambda condition before returning value
inline void wait_and_pop(T& rValue) {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty();});
// ideally should return an invalid value
if (!shutdownFlag) {
rValue = data_queue.front();
data_queue.pop();
}
}
// wait for non empty lambda condition before returning shared pointer to value
inline std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty() || shutdownFlag;});
if (shutdownFlag) {
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
return nullptr;
}
// return value in specified reference and flag indicating whether value
// successfully returned or not
inline bool try_pop(T& rValue) {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return false;
}
rValue = data_queue.front();
data_queue.pop();
return true;
}
// return shared pointer to value - which if set to nullptr,
// indicates container was empty at the time of the call.
inline std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
// thread safe method to check if the queue is empty
// note that if it is empty
inline bool empty() const {
std::lock_guard<std::mutex> lock(mut);
return data_queue.empty();
}
// shutdown support - wake up potentially sleeping queues
inline void shutdown() {
shutdownFlag = true;
data_cond.notify_all();
}
};
#endif // _utlThreadSafeQueue_h_
经过评论中的扩展讨论,并根据 Coliru 链接,我想我理解了您最初尝试做的事情,我也想对您的数据结构提出一些建议。
您提到您的
push()
函数是移动感知的。出色的!但是,小心点。如果您查看
push
函数的定义方式,inline void push(T&& value)
我想在这里指出几件事。首先是这只会绑定到右值引用和 而不是 通用引用(或者 forwarding references ,因为它们很快就会被称为 )。您在
push
中使用std::forward
是不恰当的做法(尽管在技术上是正确的)。原因是类型T
已经在 class 级别推导出来(当您实例化UtlThreadSafeQueue
时)。要获得完美的转发语义,您需要push
如下所示:template<typename U> inline void push(U&& value) { ... }
此版本的
push
接受任何类型的引用,正如您所期望的那样。但是,它的用途是 将 沿着任何参数转发到适当的 constructor/function/etc。由于您希望维护一个内部std::queue<std::shared_ptr<BaseMessage>>
,您可以有一个push
接受对BaseMessage
的派生类型的引用(左值或右值),并放置一个std::shared_ptr<DerivedType>
进入队列。这将建立指针到基的关系(std::shared_ptr<BaseMessage> base_ptr = derived_ptr
,其中derived_ptr
是std::shared_ptr<DerivedMessage>
类型)。这可以通过以下方式完成:template<typename U> inline std::enable_if_t<std::is_base_of<T,std::decay_t<U>>::value> push(U&& value) { std::unique_lock<std::mutex> lock(mut); // only add the value on the stack if there is room data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;}); data_queue.emplace(std::make_shared<std::decay_t<U>> (std::forward<U>(value))); data_cond.notify_one(); }
std::enable_if_t
的使用确保只有从BaseMessage
派生的类型被传递到push
函数。std::make_shared<std::decay_t<U>> (std::forward<U>(value))
队列中的位置将调用std::shared_ptr
的第 9 个构造函数(如 Here 所列)。这样做的好处在于,它允许您和您的用户编写如下代码:
UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ(10); StringMessage sm("Hi there!"); IntegerMessage im(4242); dataLoadSessionQ.push(sm); dataLoadSessionQ.push(im);
它会按预期运行。每个 Message 都由 lvalue-ref 传入,并且
shared_ptr
通过调用派生类型的复制构造函数来创建。你公开了一个
push
接口,它不仅接受std::shared_ptr
,而且接受std::shared_ptr&&
,它有一些微妙之处。乍一看,似乎我无法做到这一点(从您的 Coliru 链接中借用
StringMessage
和BaseMessage
类型):UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ; auto my_message = std::make_shared<StringMessage>("Another message!"); dataLoadSessionQ.push(my_message);
尽管
push
被定义为对shared_ptr
进行右值引用,但此代码通过传递my_message
进行编译(这不是右值参考!)。起初我并不清楚原因。但是,事实证明,类似于static_cast
,有一个为shared_ptr
定义的static_pointer_cast
,它看起来像下面这样(从 Here 借来的):template< class T, class U > std::shared_ptr<T> static_pointer_cast( const std::shared_ptr<U>& r );
如果转换成功,它将执行从
shared_ptr<U>
到shared_ptr<T>
的转换。因为您的 Coliru 示例在内部使用std::queue<std::shared_ptr<BaseMessage>>
,而您正试图推送shared_ptr<StringMessage>
,隐式 到shared_ptr<BaseMessage>
的转换成功,因为StringMessage
继承自BaseMessage
。转换 return 是一个别名构造的std::shared_ptr<BaseMessage>
,它将愉快地绑定到右值引用。请注意,如果您尝试这样做:
UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ; auto generic_message = std::make_shared<BaseMessage>(); dataLoadSessionQ.push(generic_message);
您得到了我们(或者可能只是我)最初期待的编译器错误
error: cannot bind 'std::shared_ptr' lvalue to 'std::shared_ptr&&' dataLoadSessionQ.push(generic_message);
老实说,无论是从性能还是美学方面,我都找不到必须将 shared_ptr<Derived>
传递给 UtlThreadSafeQueue<Base>
的充分理由。我希望能够传入 both 一个临时的 Derived
和一个左值,而不用担心队列的内部结构很多。
您还可以通过 std::move
ing 来自 data_queue.front()
的值来利用 wait_and_pop()
/try_pop()
that return shared_ptr
s无论如何在下一次调用 data_queue.pop()
)
在您的 UtlThreadSafeQueue
构造函数中,我还会考虑将 const std::size_t&
更改为按值 std::size_t
并且对于(示例)IntegerMessage
类型也是如此。
考虑到这一点,对于我上面强调的更改的任何反馈,我将不胜感激 - 坦率地说,直到很久以后,当您发布更多示例并继续编辑问题时,我才能理解您的目标/实施。