在不同线程上发布作品时如何将信号连接到 boost::asio::io_service?
How to connect signal to boost::asio::io_service when posting work on different thread?
我正在尝试使用 boost::lockfree 队列来管理任务。这些任务检索数据并将在工作线程上进行处理。检索数据后,应将信号与数据一起发送到主线程。工作线程在应用程序启动时产生,并不断轮询队列。我是 Boost::Asio 的新手,但根据我的研究,它似乎是在线程之间发送信号的最佳机制。
我看过几个例子,特别是:
- Confused when boost::asio::io_service run method blocks/unblocks
- boost asio post not working , io_service::run exits right after post
这是我的代码:
#include "stdafx.h"
#include <thread>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>
#include <boost/signals2.hpp>
typedef boost::signals2::signal<void(int)> signal_type;
class Task
{
public:
Task(int handle) : _handle(handle) {};
~Task() {};
virtual void Execute()
{
int result = _handle * 2;
}
private:
int _handle;
};
class Manager
{
public:
Manager()
{
_mainService = std::make_shared<boost::asio::io_service>();
_workerService = std::make_shared<boost::asio::io_service>();
_work = std::make_shared<boost::asio::io_service::work>(*_workerService);
_threadStarted = false;
Start();
};
~Manager() {};
void WorkerMain()
{
_workerService->poll();
}
void Start()
{
if (_threadStarted) return;
_workerThread = std::thread(&Manager::WorkerMain, this);
_threadStarted = true;
}
void Stop()
{
if (_threadStarted == false) return;
_mainService->stop();
_workerThread.join();
_mainService.reset();
}
void OnSignalFetchCompleted(int value)
{
int asdf = 0; //do stuff with data on main thread
}
void ProcessData(signal_type& signal)
{
int i = 0;
do
{
_queue.consume_one([&](std::shared_ptr<Task> task)
{
task->Execute();
//get data from task; send out signal with data
});
i++;
} while (i < 3);
}
void QueueData(int handle)
{
_signalFetchCompleted.connect(boost::bind(&Manager::OnSignalFetchCompleted, this, _1));
_workerService->post(boost::bind(&Manager::ProcessData, boost::ref(_signalFetchCompleted))); //!!does not compile
std::shared_ptr<Task> task = std::make_shared<Task>(handle);
_queue.push(task);
}
private:
boost::lockfree::spsc_queue<std::shared_ptr<Task>, boost::lockfree::capacity<1024>> _queue;
std::thread _workerThread;
bool _threadStarted;
std::shared_ptr<boost::asio::io_service> _mainService;
std::shared_ptr<boost::asio::io_service> _workerService;
std::shared_ptr<boost::asio::io_service::work> _work;
signal_type _signalFetchCompleted;
};
int _tmain(int argc, _TCHAR* argv[])
{
std::shared_ptr<Manager> mgr = std::make_shared<Manager>();
mgr->QueueData(5);
mgr->QueueData(10);
mgr->Stop();
return 0;
}
我在 _workerService->Post 行遇到一个我无法解决的编译错误:
1>C:\Boost\boost/bind/mem_fn.hpp(333): error C2784: 'T *boost::get_pointer(const boost::scoped_ptr<T> &)' : could not deduce template argument for 'const boost::scoped_ptr<T> &' from 'const signal_type'
1> C:\Boost\boost/smart_ptr/scoped_ptr.hpp(150) : see declaration of 'boost::get_pointer'
1> C:\Boost\boost/bind/mem_fn.hpp(352) : see reference to function template instantiation 'R (__cdecl &boost::_mfi::dm<R,Manager>::call<const U>(U &,const void *) const)' being compiled
1> with
1> [
1> R=void (signal_type &)
1> , U=signal_type
1> ]
1> C:\Boost\boost/bind/mem_fn.hpp(352) : see reference to function template instantiation 'R (__cdecl &boost::_mfi::dm<R,Manager>::call<const U>(U &,const void *) const)' being compiled
1> with
1> [
1> R=void (signal_type &)
1> , U=signal_type
1> ]
1> C:\Boost\boost/bind/bind.hpp(243) : see reference to function template instantiation 'R (__cdecl &boost::_mfi::dm<R,Manager>::operator ()<T>(const U &) const)' being compiled
1> with
1> [
1> R=void (signal_type &)
1> , T=signal_type
1> , U=signal_type
1> ]
对于解决此编译错误的任何帮助或对此方法的一般评论,我们将不胜感激。谢谢。
编译器错误似乎是在谈论您构建了一个 boost::asio::io_service::work
对象并且您向其传递了不正确的参数:
error C2664: 'boost::asio::io_service::work::work(const boost::asio::io_service::work &)' : cannot convert argument 1 from 'std::shared_ptr<boost::asio::io_service>' to 'boost::asio::io_service &'
boost::asio::io_service::work
有一个构造函数,它接受一个 boost::asio::io_service&
和一个复制构造函数;但是,您传递的是 std::shared_ptr<boost::asio::io_service>
:
_work = std::make_shared<boost::asio::io_service::work>(_workerService);
在这里,_workerService
是一个std::shared_ptr<boost::asio::io_service>
,但是你需要一个boost::asio::io_service&
。请尝试以下操作:
_work = std::make_shared<boost::asio::io_service::work>(*_workerService);
我认为 boost::asio 不是您任务的最佳解决方案。你读过条件变量吗?它们要简单得多,可用于实现您的目标。
根据新信息,问题出在您的 boost::bind
上。您正在尝试调用一个没有对象的成员函数来调用它:您正在尝试调用 ProcessData
但您没有告诉绑定您希望在哪个对象上调用它。你需要给它一个 Manager
来调用它:
_workerService->post(boost::bind(&Manager::ProcessData, this, boost::ref(_signalFetchCompleted)));
这将在 this
上调用 ProcessData
并传入对 _signalFetchCompleted
的引用
我正在尝试使用 boost::lockfree 队列来管理任务。这些任务检索数据并将在工作线程上进行处理。检索数据后,应将信号与数据一起发送到主线程。工作线程在应用程序启动时产生,并不断轮询队列。我是 Boost::Asio 的新手,但根据我的研究,它似乎是在线程之间发送信号的最佳机制。
我看过几个例子,特别是:
- Confused when boost::asio::io_service run method blocks/unblocks
- boost asio post not working , io_service::run exits right after post
这是我的代码:
#include "stdafx.h"
#include <thread>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>
#include <boost/signals2.hpp>
typedef boost::signals2::signal<void(int)> signal_type;
class Task
{
public:
Task(int handle) : _handle(handle) {};
~Task() {};
virtual void Execute()
{
int result = _handle * 2;
}
private:
int _handle;
};
class Manager
{
public:
Manager()
{
_mainService = std::make_shared<boost::asio::io_service>();
_workerService = std::make_shared<boost::asio::io_service>();
_work = std::make_shared<boost::asio::io_service::work>(*_workerService);
_threadStarted = false;
Start();
};
~Manager() {};
void WorkerMain()
{
_workerService->poll();
}
void Start()
{
if (_threadStarted) return;
_workerThread = std::thread(&Manager::WorkerMain, this);
_threadStarted = true;
}
void Stop()
{
if (_threadStarted == false) return;
_mainService->stop();
_workerThread.join();
_mainService.reset();
}
void OnSignalFetchCompleted(int value)
{
int asdf = 0; //do stuff with data on main thread
}
void ProcessData(signal_type& signal)
{
int i = 0;
do
{
_queue.consume_one([&](std::shared_ptr<Task> task)
{
task->Execute();
//get data from task; send out signal with data
});
i++;
} while (i < 3);
}
void QueueData(int handle)
{
_signalFetchCompleted.connect(boost::bind(&Manager::OnSignalFetchCompleted, this, _1));
_workerService->post(boost::bind(&Manager::ProcessData, boost::ref(_signalFetchCompleted))); //!!does not compile
std::shared_ptr<Task> task = std::make_shared<Task>(handle);
_queue.push(task);
}
private:
boost::lockfree::spsc_queue<std::shared_ptr<Task>, boost::lockfree::capacity<1024>> _queue;
std::thread _workerThread;
bool _threadStarted;
std::shared_ptr<boost::asio::io_service> _mainService;
std::shared_ptr<boost::asio::io_service> _workerService;
std::shared_ptr<boost::asio::io_service::work> _work;
signal_type _signalFetchCompleted;
};
int _tmain(int argc, _TCHAR* argv[])
{
std::shared_ptr<Manager> mgr = std::make_shared<Manager>();
mgr->QueueData(5);
mgr->QueueData(10);
mgr->Stop();
return 0;
}
我在 _workerService->Post 行遇到一个我无法解决的编译错误:
1>C:\Boost\boost/bind/mem_fn.hpp(333): error C2784: 'T *boost::get_pointer(const boost::scoped_ptr<T> &)' : could not deduce template argument for 'const boost::scoped_ptr<T> &' from 'const signal_type'
1> C:\Boost\boost/smart_ptr/scoped_ptr.hpp(150) : see declaration of 'boost::get_pointer'
1> C:\Boost\boost/bind/mem_fn.hpp(352) : see reference to function template instantiation 'R (__cdecl &boost::_mfi::dm<R,Manager>::call<const U>(U &,const void *) const)' being compiled
1> with
1> [
1> R=void (signal_type &)
1> , U=signal_type
1> ]
1> C:\Boost\boost/bind/mem_fn.hpp(352) : see reference to function template instantiation 'R (__cdecl &boost::_mfi::dm<R,Manager>::call<const U>(U &,const void *) const)' being compiled
1> with
1> [
1> R=void (signal_type &)
1> , U=signal_type
1> ]
1> C:\Boost\boost/bind/bind.hpp(243) : see reference to function template instantiation 'R (__cdecl &boost::_mfi::dm<R,Manager>::operator ()<T>(const U &) const)' being compiled
1> with
1> [
1> R=void (signal_type &)
1> , T=signal_type
1> , U=signal_type
1> ]
对于解决此编译错误的任何帮助或对此方法的一般评论,我们将不胜感激。谢谢。
编译器错误似乎是在谈论您构建了一个 boost::asio::io_service::work
对象并且您向其传递了不正确的参数:
error C2664: 'boost::asio::io_service::work::work(const boost::asio::io_service::work &)' : cannot convert argument 1 from 'std::shared_ptr<boost::asio::io_service>' to 'boost::asio::io_service &'
boost::asio::io_service::work
有一个构造函数,它接受一个 boost::asio::io_service&
和一个复制构造函数;但是,您传递的是 std::shared_ptr<boost::asio::io_service>
:
_work = std::make_shared<boost::asio::io_service::work>(_workerService);
在这里,_workerService
是一个std::shared_ptr<boost::asio::io_service>
,但是你需要一个boost::asio::io_service&
。请尝试以下操作:
_work = std::make_shared<boost::asio::io_service::work>(*_workerService);
我认为 boost::asio 不是您任务的最佳解决方案。你读过条件变量吗?它们要简单得多,可用于实现您的目标。
根据新信息,问题出在您的 boost::bind
上。您正在尝试调用一个没有对象的成员函数来调用它:您正在尝试调用 ProcessData
但您没有告诉绑定您希望在哪个对象上调用它。你需要给它一个 Manager
来调用它:
_workerService->post(boost::bind(&Manager::ProcessData, this, boost::ref(_signalFetchCompleted)));
这将在 this
上调用 ProcessData
并传入对 _signalFetchCompleted