asio::io_service 和 thread_group 生命周期问题
asio::io_service and thread_group lifecycle issue
查看 answers like this one,我们可以做如下事情:
boost::asio::io_service ioService;
boost::thread_group threadpool;
{
boost::asio::io_service::work work(ioService);
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, ioService));
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
ioService.post(boost::bind(...));
ioService.post(boost::bind(...));
ioService.post(boost::bind(...));
}
threadpool.join_all();
但是,在我的例子中,我想做类似的事情:
while (condition)
{
ioService.post(boost::bind(...));
ioService.post(boost::bind(...));
ioService.post(boost::bind(...));
threadpool.join_all();
// DO SOMETHING WITH RESULTS
}
但是,boost::asio::io_service::work work(ioService)
行不合适,据我所知,如果不重新创建池中的每个线程,我无法重新创建它。
在我的代码中,创建线程的开销似乎可以忽略不计(而且实际上比以前基于互斥的代码性能更好),但是有没有更简洁的方法来做到这一点?
while (condition)
{
//... stuff
threadpool.join_all();
//...
}
没有任何意义,因为您只能加入线程一次。一旦加入,他们就消失了。您不想一直启动新线程(使用线程池 + 任务队列¹)。
由于您不想真正停止线程,您可能不想破坏工作。如果你坚持,shared_ptr<work>
或 optional<work>
效果很好(只是 my_work.reset()
它)
¹ 更新 建议:
- 带有任务队列的简单
thread_pool
:(在 boost thread throwing exception "thread_resource_error: resource temporarily unavailable" 中)
- 基于
io_service
本身的队列(使用work
)c++ work queues with blocking
更新
对 "SOLUTION #2" 的简单扩展可以等待所有任务完成,而无需加入 workers/destroying 池:
void drain() {
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::empty(phx::ref(_queue)));
}
请注意,为了可靠运行,还需要在出队时发出条件变量信号:
cv.notify_all(); // in order to signal drain
注意事项
这是一个邀请竞争条件的接口(队列可以接受来自多个线程的作业,所以一旦 drain()
returns,另一个线程可能已经发布了一个新任务)
当队列为空时发出信号,而不是在任务完成时发出信号。队列无法知道这一点,如果您需要,请使用任务中的 barrier/signal 条件(本例中为 the_work
)。 queuing/scheduling 的机制与此处无关。
演示
#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
class thread_pool
{
private:
mutex mx;
condition_variable cv;
typedef function<void()> job_t;
std::deque<job_t> _queue;
thread_group pool;
boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}
public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}
void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));
cv.notify_one();
}
void drain() {
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::empty(phx::ref(_queue)));
}
optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
if (_queue.empty())
return none;
auto job = std::move(_queue.front());
_queue.pop_front();
cv.notify_all(); // in order to signal drain
return std::move(job);
}
~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}
pool.join_all();
}
};
void the_work(int id)
{
std::cout << "worker " << id << " entered\n";
// no more synchronization; the pool size determines max concurrency
std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::milliseconds(2));
std::cout << "worker " << id << " done\n";
}
int main()
{
thread_pool pool; // uses 1 thread per core
for (auto i = 0ull; i < 20; ++i) {
for (int i = 0; i < 10; ++i)
pool.enqueue(bind(the_work, i));
pool.drain(); // make the queue empty, leave the threads
std::cout << "Queue empty\n";
}
// destructing pool joins the worker threads
}
查看 answers like this one,我们可以做如下事情:
boost::asio::io_service ioService;
boost::thread_group threadpool;
{
boost::asio::io_service::work work(ioService);
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, ioService));
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
ioService.post(boost::bind(...));
ioService.post(boost::bind(...));
ioService.post(boost::bind(...));
}
threadpool.join_all();
但是,在我的例子中,我想做类似的事情:
while (condition)
{
ioService.post(boost::bind(...));
ioService.post(boost::bind(...));
ioService.post(boost::bind(...));
threadpool.join_all();
// DO SOMETHING WITH RESULTS
}
但是,boost::asio::io_service::work work(ioService)
行不合适,据我所知,如果不重新创建池中的每个线程,我无法重新创建它。
在我的代码中,创建线程的开销似乎可以忽略不计(而且实际上比以前基于互斥的代码性能更好),但是有没有更简洁的方法来做到这一点?
while (condition)
{
//... stuff
threadpool.join_all();
//...
}
没有任何意义,因为您只能加入线程一次。一旦加入,他们就消失了。您不想一直启动新线程(使用线程池 + 任务队列¹)。
由于您不想真正停止线程,您可能不想破坏工作。如果你坚持,shared_ptr<work>
或 optional<work>
效果很好(只是 my_work.reset()
它)
¹ 更新 建议:
- 带有任务队列的简单
thread_pool
:(在 boost thread throwing exception "thread_resource_error: resource temporarily unavailable" 中) - 基于
io_service
本身的队列(使用work
)c++ work queues with blocking
更新
对 "SOLUTION #2" 的简单扩展可以等待所有任务完成,而无需加入 workers/destroying 池:
void drain() {
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::empty(phx::ref(_queue)));
}
请注意,为了可靠运行,还需要在出队时发出条件变量信号:
cv.notify_all(); // in order to signal drain
注意事项
这是一个邀请竞争条件的接口(队列可以接受来自多个线程的作业,所以一旦
drain()
returns,另一个线程可能已经发布了一个新任务)当队列为空时发出信号,而不是在任务完成时发出信号。队列无法知道这一点,如果您需要,请使用任务中的 barrier/signal 条件(本例中为
the_work
)。 queuing/scheduling 的机制与此处无关。
演示
#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
class thread_pool
{
private:
mutex mx;
condition_variable cv;
typedef function<void()> job_t;
std::deque<job_t> _queue;
thread_group pool;
boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}
public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}
void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));
cv.notify_one();
}
void drain() {
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::empty(phx::ref(_queue)));
}
optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
if (_queue.empty())
return none;
auto job = std::move(_queue.front());
_queue.pop_front();
cv.notify_all(); // in order to signal drain
return std::move(job);
}
~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}
pool.join_all();
}
};
void the_work(int id)
{
std::cout << "worker " << id << " entered\n";
// no more synchronization; the pool size determines max concurrency
std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::milliseconds(2));
std::cout << "worker " << id << " done\n";
}
int main()
{
thread_pool pool; // uses 1 thread per core
for (auto i = 0ull; i < 20; ++i) {
for (int i = 0; i < 10; ++i)
pool.enqueue(bind(the_work, i));
pool.drain(); // make the queue empty, leave the threads
std::cout << "Queue empty\n";
}
// destructing pool joins the worker threads
}