提升 asio 优先级队列,从处理程序添加异步操作
boost asio priority queue, add async operation from handler
我对基于 boost asio 的优先级队列示例的设计有疑问。如果我从处理程序中添加一个包装的处理程序,它似乎会丢失:
我按原样使用了所有内容,并将 main() 函数替换为以下代码:
//
// based on prioritised_handlers.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <iostream>
#include <queue>
using boost::asio::ip::tcp;
class handler_priority_queue
{
public:
void add(int priority, boost::function<void()> function)
{
handlers_.push(queued_handler(priority, function));
}
void execute_all()
{
while (!handlers_.empty())
{
queued_handler handler = handlers_.top();
handler.execute();
handlers_.pop();
}
}
// A generic wrapper class for handlers to allow the invocation to be hooked.
template <typename Handler>
class wrapped_handler
{
public:
wrapped_handler(handler_priority_queue& q, int p, Handler h)
: queue_(q), priority_(p), handler_(h)
{
}
void operator()()
{
handler_();
}
template <typename Arg1>
void operator()(Arg1 arg1)
{
handler_(arg1);
}
template <typename Arg1, typename Arg2>
void operator()(Arg1 arg1, Arg2 arg2)
{
handler_(arg1, arg2);
}
//private:
handler_priority_queue& queue_;
int priority_;
Handler handler_;
};
template <typename Handler>
wrapped_handler<Handler> wrap(int priority, Handler handler)
{
return wrapped_handler<Handler>(*this, priority, handler);
}
private:
class queued_handler
{
public:
queued_handler(int p, boost::function<void()> f)
: priority_(p), function_(f)
{
}
void execute()
{
function_();
}
friend bool operator<(const queued_handler& a,
const queued_handler& b)
{
return a.priority_ < b.priority_;
}
private:
int priority_;
boost::function<void()> function_;
};
std::priority_queue<queued_handler> handlers_;
};
// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function f,
handler_priority_queue::wrapped_handler<Handler>* h)
{
h->queue_.add(h->priority_, f);
}
void low_priority_handler()
{
std::cout << "Low priority handler\n";
}
int main()
{
//
// BASED ON prioritised_handlers.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2011 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
//----------------------------------------------------------------------
using boost::asio::ip::tcp;
boost::asio::io_service io_service;
handler_priority_queue pri_queue;
// Post a completion handler to be run immediately.
io_service.post(pri_queue.wrap(0, low_priority_handler));
// Set a deadline timer to expire immediately.
boost::asio::deadline_timer timer1(io_service);
timer1.expires_at(boost::posix_time::neg_infin);
timer1.async_wait(pri_queue.wrap(42, [](const boost::system::error_code& )
{
std::cout << "now" << std::endl;
}));
// Set a deadline timer to expire later.
boost::asio::deadline_timer timer2(io_service, boost::posix_time::milliseconds(100));
boost::asio::deadline_timer timer3(io_service, boost::posix_time::milliseconds(200));
timer2.async_wait(pri_queue.wrap(100, [&pri_queue, &timer3](const boost::system::error_code& )
{
std::cout << "100ms" << std::endl;
timer3.async_wait(pri_queue.wrap(100, [](const boost::system::error_code& )
{
std::cout << "200ms" << std::endl;
}));
}));
while (io_service.run_one())
{
// The custom invocation hook adds the handlers to the priority queue
// rather than executing them from within the poll_one() call.
while (io_service.poll_one())
;
pri_queue.execute_all();
}
}
//g++ -std=c++14 -Wall -Werror -rdynamic -lboost_system -lboost_thread -lboost_log -lpthread prioritised_handlers.cpp
这会打印:
now
Low priority handler
100ms
定时器 3 的 200 毫秒打印输出丢失了。根据我的 printf 调试方法,自定义调用挂钩 asio_handler_invoke() 永远不会为“200ms”操作调用。不幸的是我不明白为什么。
上述方法有什么问题?
更新 在 Technik
提示后包含示例其余部分的代码
我认为 Christopher 的实现有一个错误:
void execute_all()
{
while (!handlers_.empty())
{
queued_handler handler = handlers_.top();
handler.execute(); //point1
handlers_.pop(); //point2
}
}
如果您在 point1 向 std::priority_queue
添加新元素,这是相同的优先级(在您的情况下为 100)- 可能会添加处理程序到堆的顶部。在这种情况下,point2 上的 pop()
将在不执行的情况下弹出我们的新处理程序。在这种情况下,您的 io_service
循环将停止,因为没有更多的工作。
我想你可以换行 point1
和 point2
来解决这个问题。
任何代码都没有问题,除了您保持(或更确切地说不保持)应用程序活动的方式足以让最后一个处理程序完成。
您是 运行 io_service,轮询和 运行 每项工作。所有作业都得到处理,但它们是延迟的异步调用。您指望 io_service 调用使应用程序保持活动状态,但就 io_service 而言,所有工作都已完成,因此它停止阻塞,您的 main() 函数退出并且您永远看不到最终处理程序的输出。
为了向您证明这一点,只需将 io_service
包装在一个 ::work
对象中,这将防止 io_service
永远认为这是没有工作的,因此总是块。
boost::asio::io_service::work w(io_service);
将此添加到 while (io_service.run_one())
行之前。
我对基于 boost asio 的优先级队列示例的设计有疑问。如果我从处理程序中添加一个包装的处理程序,它似乎会丢失:
我按原样使用了所有内容,并将 main() 函数替换为以下代码:
//
// based on prioritised_handlers.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <iostream>
#include <queue>
using boost::asio::ip::tcp;
class handler_priority_queue
{
public:
void add(int priority, boost::function<void()> function)
{
handlers_.push(queued_handler(priority, function));
}
void execute_all()
{
while (!handlers_.empty())
{
queued_handler handler = handlers_.top();
handler.execute();
handlers_.pop();
}
}
// A generic wrapper class for handlers to allow the invocation to be hooked.
template <typename Handler>
class wrapped_handler
{
public:
wrapped_handler(handler_priority_queue& q, int p, Handler h)
: queue_(q), priority_(p), handler_(h)
{
}
void operator()()
{
handler_();
}
template <typename Arg1>
void operator()(Arg1 arg1)
{
handler_(arg1);
}
template <typename Arg1, typename Arg2>
void operator()(Arg1 arg1, Arg2 arg2)
{
handler_(arg1, arg2);
}
//private:
handler_priority_queue& queue_;
int priority_;
Handler handler_;
};
template <typename Handler>
wrapped_handler<Handler> wrap(int priority, Handler handler)
{
return wrapped_handler<Handler>(*this, priority, handler);
}
private:
class queued_handler
{
public:
queued_handler(int p, boost::function<void()> f)
: priority_(p), function_(f)
{
}
void execute()
{
function_();
}
friend bool operator<(const queued_handler& a,
const queued_handler& b)
{
return a.priority_ < b.priority_;
}
private:
int priority_;
boost::function<void()> function_;
};
std::priority_queue<queued_handler> handlers_;
};
// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function f,
handler_priority_queue::wrapped_handler<Handler>* h)
{
h->queue_.add(h->priority_, f);
}
void low_priority_handler()
{
std::cout << "Low priority handler\n";
}
int main()
{
//
// BASED ON prioritised_handlers.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2011 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
//----------------------------------------------------------------------
using boost::asio::ip::tcp;
boost::asio::io_service io_service;
handler_priority_queue pri_queue;
// Post a completion handler to be run immediately.
io_service.post(pri_queue.wrap(0, low_priority_handler));
// Set a deadline timer to expire immediately.
boost::asio::deadline_timer timer1(io_service);
timer1.expires_at(boost::posix_time::neg_infin);
timer1.async_wait(pri_queue.wrap(42, [](const boost::system::error_code& )
{
std::cout << "now" << std::endl;
}));
// Set a deadline timer to expire later.
boost::asio::deadline_timer timer2(io_service, boost::posix_time::milliseconds(100));
boost::asio::deadline_timer timer3(io_service, boost::posix_time::milliseconds(200));
timer2.async_wait(pri_queue.wrap(100, [&pri_queue, &timer3](const boost::system::error_code& )
{
std::cout << "100ms" << std::endl;
timer3.async_wait(pri_queue.wrap(100, [](const boost::system::error_code& )
{
std::cout << "200ms" << std::endl;
}));
}));
while (io_service.run_one())
{
// The custom invocation hook adds the handlers to the priority queue
// rather than executing them from within the poll_one() call.
while (io_service.poll_one())
;
pri_queue.execute_all();
}
}
//g++ -std=c++14 -Wall -Werror -rdynamic -lboost_system -lboost_thread -lboost_log -lpthread prioritised_handlers.cpp
这会打印:
now
Low priority handler
100ms
定时器 3 的 200 毫秒打印输出丢失了。根据我的 printf 调试方法,自定义调用挂钩 asio_handler_invoke() 永远不会为“200ms”操作调用。不幸的是我不明白为什么。
上述方法有什么问题?
更新 在 Technik
提示后包含示例其余部分的代码我认为 Christopher 的实现有一个错误:
void execute_all()
{
while (!handlers_.empty())
{
queued_handler handler = handlers_.top();
handler.execute(); //point1
handlers_.pop(); //point2
}
}
如果您在 point1 向 std::priority_queue
添加新元素,这是相同的优先级(在您的情况下为 100)- 可能会添加处理程序到堆的顶部。在这种情况下,point2 上的 pop()
将在不执行的情况下弹出我们的新处理程序。在这种情况下,您的 io_service
循环将停止,因为没有更多的工作。
我想你可以换行 point1
和 point2
来解决这个问题。
任何代码都没有问题,除了您保持(或更确切地说不保持)应用程序活动的方式足以让最后一个处理程序完成。
您是 运行 io_service,轮询和 运行 每项工作。所有作业都得到处理,但它们是延迟的异步调用。您指望 io_service 调用使应用程序保持活动状态,但就 io_service 而言,所有工作都已完成,因此它停止阻塞,您的 main() 函数退出并且您永远看不到最终处理程序的输出。
为了向您证明这一点,只需将 io_service
包装在一个 ::work
对象中,这将防止 io_service
永远认为这是没有工作的,因此总是块。
boost::asio::io_service::work w(io_service);
将此添加到 while (io_service.run_one())
行之前。