boost:asio::read 或 boost:asio::async_read 超时
boost:asio::read or boost:asio::async_read with timeout
是的。我知道在 boost::asio
中有一些关于此 time_out
的问题。我的问题对于 asio
的人来说可能太简单了,无法在这里解决。
我在 TCP 协议上使用 boost::asio
以尽可能快的速度通过网络连续读取数据。
以下函数 ReadData()
在 while 循环中从工作人员 std::thread
连续调用。
std::size_t ReadData(std::vector<unsigned char> & buffer, unsigned int size_to_read) {
boost::system::error_code error_code;
buffer.resize(size_to_read);
// Receive body
std::size_t bytes_read = boost::asio::read(*m_socket, boost::asio::buffer(buffer), error_code);
if (bytes_read == 0) {
// log error
return;
}
return bytes_read;
}
它工作正常。 Returns 数据。一切顺利
我只想为boost::asio::read
使用time_out。我了解到我需要使用 boost::asio::async_read
和 boost::asio::async_wait
才能使 time_out 技术起作用。
一个boost example建议使用boost::asio::async_read_until
?
我应该使用 boost::asio::async_read
还是 boost::asio::async_read_until
?
我用boost::asio::async_read
还是boost::asio::async_read_until
还是boost::asio::read
都没关系。但是我希望 asio::read
调用在对我的方法 ReadData
的调用中被触发并完成,这样客户端代码就不会受到影响。
我怎样才能做到这一点?请推荐
好的,这样的东西应该适合你的目的:
理由:
您似乎想使用阻塞操作。既然是这种情况,您很可能没有 运行 线程来抽取 io 循环。
所以我们在套接字的 io 循环上启动两个同步异步任务,然后生成一个线程:
a) 重置(实际上是重新启动)循环以防它已经耗尽
b) 运行 循环到耗尽(我们可以在这里更聪明,只 运行 它直到处理程序指示已经满足某些条件,但这是另一天的教训)
#include <type_traits>
template<class Stream, class ConstBufferSequence, class Handler>
auto async_read_with_timeout(Stream& stream, ConstBufferSequence&& sequence, std::size_t millis, Handler&& handler)
{
using handler_type = std::decay_t<Handler>;
using buffer_sequence_type = std::decay_t<ConstBufferSequence>;
using stream_type = Stream;
struct state_machine : std::enable_shared_from_this<state_machine>
{
state_machine(stream_type& stream, buffer_sequence_type sequence, handler_type handler)
: stream_(stream)
, sequence_(std::move(sequence))
, handler_(std::move(handler))
{}
void start(std::size_t millis)
{
timer_.expires_from_now(boost::posix_time::milliseconds(millis));
timer_.async_wait(strand_.wrap([self = this->shared_from_this()](auto&& ec) {
self->handle_timeout(ec);
}));
boost::asio::async_read(stream_, sequence_,
strand_.wrap([self = this->shared_from_this()](auto&& ec, auto size){
self->handle_read(ec, size);
}));
}
void handle_timeout(boost::system::error_code const& ec)
{
if (not ec and not completed_)
{
boost::system::error_code sink;
stream_.cancel(sink);
}
}
void handle_read(boost::system::error_code const& ec, std::size_t size)
{
assert(not completed_);
boost::system::error_code sink;
timer_.cancel(sink);
completed_ = true;
handler_(ec, size);
}
stream_type& stream_;
buffer_sequence_type sequence_;
handler_type handler_;
boost::asio::io_service::strand strand_ { stream_.get_io_service() };
boost::asio::deadline_timer timer_ { stream_.get_io_service() };
bool completed_ = false;
};
auto psm = std::make_shared<state_machine>(stream,
std::forward<ConstBufferSequence>(sequence),
std::forward<Handler>(handler));
psm->start(millis);
}
std::size_t ReadData(boost::asio::ip::tcp::socket& socket,
std::vector<unsigned char> & buffer,
unsigned int size_to_read,
boost::system::error_code& ec) {
buffer.resize(size_to_read);
ec.clear();
std::size_t bytes_read = 0;
auto& executor = socket.get_io_service();
async_read_with_timeout(socket, boost::asio::buffer(buffer),
2000, // 2 seconds for example
[&](auto&& err, auto size){
ec = err;
bytes_read = size;
});
// todo: use a more scalable executor than spawning threads
auto future = std::async(std::launch::async, [&] {
if (executor.stopped()) {
executor.reset();
}
executor.run();
});
future.wait();
return bytes_read;
}
是的。我知道在 boost::asio
中有一些关于此 time_out
的问题。我的问题对于 asio
的人来说可能太简单了,无法在这里解决。
我在 TCP 协议上使用 boost::asio
以尽可能快的速度通过网络连续读取数据。
以下函数 ReadData()
在 while 循环中从工作人员 std::thread
连续调用。
std::size_t ReadData(std::vector<unsigned char> & buffer, unsigned int size_to_read) {
boost::system::error_code error_code;
buffer.resize(size_to_read);
// Receive body
std::size_t bytes_read = boost::asio::read(*m_socket, boost::asio::buffer(buffer), error_code);
if (bytes_read == 0) {
// log error
return;
}
return bytes_read;
}
它工作正常。 Returns 数据。一切顺利
我只想为boost::asio::read
使用time_out。我了解到我需要使用 boost::asio::async_read
和 boost::asio::async_wait
才能使 time_out 技术起作用。
一个boost example建议使用boost::asio::async_read_until
?
我应该使用 boost::asio::async_read
还是 boost::asio::async_read_until
?
我用boost::asio::async_read
还是boost::asio::async_read_until
还是boost::asio::read
都没关系。但是我希望 asio::read
调用在对我的方法 ReadData
的调用中被触发并完成,这样客户端代码就不会受到影响。
我怎样才能做到这一点?请推荐
好的,这样的东西应该适合你的目的:
理由:
您似乎想使用阻塞操作。既然是这种情况,您很可能没有 运行 线程来抽取 io 循环。
所以我们在套接字的 io 循环上启动两个同步异步任务,然后生成一个线程:
a) 重置(实际上是重新启动)循环以防它已经耗尽
b) 运行 循环到耗尽(我们可以在这里更聪明,只 运行 它直到处理程序指示已经满足某些条件,但这是另一天的教训)
#include <type_traits>
template<class Stream, class ConstBufferSequence, class Handler>
auto async_read_with_timeout(Stream& stream, ConstBufferSequence&& sequence, std::size_t millis, Handler&& handler)
{
using handler_type = std::decay_t<Handler>;
using buffer_sequence_type = std::decay_t<ConstBufferSequence>;
using stream_type = Stream;
struct state_machine : std::enable_shared_from_this<state_machine>
{
state_machine(stream_type& stream, buffer_sequence_type sequence, handler_type handler)
: stream_(stream)
, sequence_(std::move(sequence))
, handler_(std::move(handler))
{}
void start(std::size_t millis)
{
timer_.expires_from_now(boost::posix_time::milliseconds(millis));
timer_.async_wait(strand_.wrap([self = this->shared_from_this()](auto&& ec) {
self->handle_timeout(ec);
}));
boost::asio::async_read(stream_, sequence_,
strand_.wrap([self = this->shared_from_this()](auto&& ec, auto size){
self->handle_read(ec, size);
}));
}
void handle_timeout(boost::system::error_code const& ec)
{
if (not ec and not completed_)
{
boost::system::error_code sink;
stream_.cancel(sink);
}
}
void handle_read(boost::system::error_code const& ec, std::size_t size)
{
assert(not completed_);
boost::system::error_code sink;
timer_.cancel(sink);
completed_ = true;
handler_(ec, size);
}
stream_type& stream_;
buffer_sequence_type sequence_;
handler_type handler_;
boost::asio::io_service::strand strand_ { stream_.get_io_service() };
boost::asio::deadline_timer timer_ { stream_.get_io_service() };
bool completed_ = false;
};
auto psm = std::make_shared<state_machine>(stream,
std::forward<ConstBufferSequence>(sequence),
std::forward<Handler>(handler));
psm->start(millis);
}
std::size_t ReadData(boost::asio::ip::tcp::socket& socket,
std::vector<unsigned char> & buffer,
unsigned int size_to_read,
boost::system::error_code& ec) {
buffer.resize(size_to_read);
ec.clear();
std::size_t bytes_read = 0;
auto& executor = socket.get_io_service();
async_read_with_timeout(socket, boost::asio::buffer(buffer),
2000, // 2 seconds for example
[&](auto&& err, auto size){
ec = err;
bytes_read = size;
});
// todo: use a more scalable executor than spawning threads
auto future = std::async(std::launch::async, [&] {
if (executor.stopped()) {
executor.reset();
}
executor.run();
});
future.wait();
return bytes_read;
}