boost::asio::io_context::run_one_for() 无法发送大缓冲区
boost::asio::io_context::run_one_for() fails to send large buffer
我需要同步 I/O 但具有以下功能:
- 被其他线程中断
- 支持超时
因此,我使用 Boost.Asio 中的异步 I/O 并通过 boost::asio::io_context::run_one_for() 执行它。
有这样的解析、连接、读写实现的例子。
long const DefaultTimeout = 50;
namespace asio = boost::asio;
using boost::asio::ip::tcp;
template <typename T>
void PerformIO(T &Object, long Timeout)
{
auto &Context = Object.get_executor().context();
Context.reset();
if (!Context.run_one_for(std::chrono::seconds(Timeout)))
throw std::exception("I/O operation was timed out");
}
template <typename T, typename TSuccessFlag>
void PerformIO(T &Object, long Timeout, TSuccessFlag &Success)
{
PerformIO(Object, Timeout);
boost::this_thread::interruption_point();
if (!Success)
throw std::exception("I/O operation was not successful");
}
tcp::resolver::results_type Resolve(tcp::resolver &Resolver, std::string const &Host, std::string const &Port)
{
bool Resolved = false;
tcp::resolver::results_type Endpoints;
Resolver.async_resolve(Host, Port,
[&](boost::system::error_code const &Error, boost::asio::ip::tcp::resolver::results_type Results){ Endpoints = Results; Resolved = true; });
PerformIO(Resolver, DefaultTimeout, Resolved);
if (Endpoints.begin() == Endpoints.end())
throw std::exception("Not resolved");
return Endpoints;
}
template <typename T>
void Connect(tcp::socket &Socket, T const &Endpoints)
{
bool Connected = false;
asio::async_connect(Socket, Endpoints,
[&](boost::system::error_code const &Error, boost::asio::ip::tcp::endpoint const &Endpoint){ Connected = true; });
PerformIO(Socket, DefaultTimeout, Connected);
}
template <typename T>
size_t ReadSome(tcp::socket &Socket, T &Buffers)
{
size_t Bytes = 0;
asio::async_read(Socket, Buffers, asio::transfer_at_least(1),
[&](boost::system::error_code const &Error, std::size_t BytesTransferred){ Bytes += BytesTransferred; });
PerformIO(Socket, DefaultTimeout, Bytes);
return Bytes;
}
template <typename T>
size_t Write(tcp::socket &Socket, T &Buffers)
{
size_t Bytes = 0;
asio::async_write(Socket, Buffers,
[&](boost::system::error_code const &Error, std::size_t BytesTransferred){ Bytes += BytesTransferred; });
PerformIO(Socket, DefaultTimeout, Bytes);
return Bytes;
}
它适用于小数据,但如果我尝试像这样发送大数据缓冲区,它就会失败:
// tcp::socket Socket;
// char const *Buffer;
// size_t BufferSize = 1000000;
Write(Socket, asio::buffer(Buffer, BufferSize))
我认为原因是 boost::asio::io_context::run_one_for() 的实现。就是这样(提升 1.67):
template <typename Rep, typename Period>
std::size_t io_context::run_one_for(
const chrono::duration<Rep, Period>& rel_time)
{
return this->run_one_until(chrono::steady_clock::now() + rel_time);
}
template <typename Clock, typename Duration>
std::size_t io_context::run_one_until(
const chrono::time_point<Clock, Duration>& abs_time)
{
typename Clock::time_point now = Clock::now();
while (now < abs_time)
{
typename Clock::duration rel_time = abs_time - now;
if (rel_time > chrono::seconds(1))
rel_time = chrono::seconds(1);
boost::system::error_code ec;
std::size_t s = impl_.wait_one(
static_cast<long>(chrono::duration_cast<
chrono::microseconds>(rel_time).count()), ec);
boost::asio::detail::throw_error(ec);
if (s || impl_.stopped())
return s;
now = Clock::now();
}
return 0;
}
注意 1 秒的限制。我认为发送大缓冲区需要超过 1 秒并且执行停止。我可以按 64k 字节顺序执行 Write()
并且它有效。但我认为如果连接无法每秒发送 64k 字节,它将失败。所以,我需要一个更好的解决方案。
I think the reason is the implementation of boost::asio::io_context::run_one_for(). There it is (boost 1.67):
没有。原因是 boost::asio::[async_][read|write][_.*]
是组合操作。 run_one[_*]
只有 运行 一个处理程序,因此它不会完成一个组合的异步操作。
另请注意,boost::asio::write
是一个同步操作,与周围的任何逻辑无关,因此您的超时甚至不会起作用。
您可以将异步操作包装在本地 io_context 中并 运行 使用截止日期完成(因此,run_for
而不是 run_one_for
)。
您可以从 How to simulate boost::asio::write with a timeout 中收集到类似的概念 - 尽管使用 run_for(...)
比 run_one()
严格(很多)更好(出于上面解释的原因。在我写另一个答案的时候,run_for()
不存在,我可能不得不更新那个答案,因为那时我可能错过了组合操作的问题。
我需要同步 I/O 但具有以下功能:
- 被其他线程中断
- 支持超时
因此,我使用 Boost.Asio 中的异步 I/O 并通过 boost::asio::io_context::run_one_for() 执行它。 有这样的解析、连接、读写实现的例子。
long const DefaultTimeout = 50;
namespace asio = boost::asio;
using boost::asio::ip::tcp;
template <typename T>
void PerformIO(T &Object, long Timeout)
{
auto &Context = Object.get_executor().context();
Context.reset();
if (!Context.run_one_for(std::chrono::seconds(Timeout)))
throw std::exception("I/O operation was timed out");
}
template <typename T, typename TSuccessFlag>
void PerformIO(T &Object, long Timeout, TSuccessFlag &Success)
{
PerformIO(Object, Timeout);
boost::this_thread::interruption_point();
if (!Success)
throw std::exception("I/O operation was not successful");
}
tcp::resolver::results_type Resolve(tcp::resolver &Resolver, std::string const &Host, std::string const &Port)
{
bool Resolved = false;
tcp::resolver::results_type Endpoints;
Resolver.async_resolve(Host, Port,
[&](boost::system::error_code const &Error, boost::asio::ip::tcp::resolver::results_type Results){ Endpoints = Results; Resolved = true; });
PerformIO(Resolver, DefaultTimeout, Resolved);
if (Endpoints.begin() == Endpoints.end())
throw std::exception("Not resolved");
return Endpoints;
}
template <typename T>
void Connect(tcp::socket &Socket, T const &Endpoints)
{
bool Connected = false;
asio::async_connect(Socket, Endpoints,
[&](boost::system::error_code const &Error, boost::asio::ip::tcp::endpoint const &Endpoint){ Connected = true; });
PerformIO(Socket, DefaultTimeout, Connected);
}
template <typename T>
size_t ReadSome(tcp::socket &Socket, T &Buffers)
{
size_t Bytes = 0;
asio::async_read(Socket, Buffers, asio::transfer_at_least(1),
[&](boost::system::error_code const &Error, std::size_t BytesTransferred){ Bytes += BytesTransferred; });
PerformIO(Socket, DefaultTimeout, Bytes);
return Bytes;
}
template <typename T>
size_t Write(tcp::socket &Socket, T &Buffers)
{
size_t Bytes = 0;
asio::async_write(Socket, Buffers,
[&](boost::system::error_code const &Error, std::size_t BytesTransferred){ Bytes += BytesTransferred; });
PerformIO(Socket, DefaultTimeout, Bytes);
return Bytes;
}
它适用于小数据,但如果我尝试像这样发送大数据缓冲区,它就会失败:
// tcp::socket Socket;
// char const *Buffer;
// size_t BufferSize = 1000000;
Write(Socket, asio::buffer(Buffer, BufferSize))
我认为原因是 boost::asio::io_context::run_one_for() 的实现。就是这样(提升 1.67):
template <typename Rep, typename Period>
std::size_t io_context::run_one_for(
const chrono::duration<Rep, Period>& rel_time)
{
return this->run_one_until(chrono::steady_clock::now() + rel_time);
}
template <typename Clock, typename Duration>
std::size_t io_context::run_one_until(
const chrono::time_point<Clock, Duration>& abs_time)
{
typename Clock::time_point now = Clock::now();
while (now < abs_time)
{
typename Clock::duration rel_time = abs_time - now;
if (rel_time > chrono::seconds(1))
rel_time = chrono::seconds(1);
boost::system::error_code ec;
std::size_t s = impl_.wait_one(
static_cast<long>(chrono::duration_cast<
chrono::microseconds>(rel_time).count()), ec);
boost::asio::detail::throw_error(ec);
if (s || impl_.stopped())
return s;
now = Clock::now();
}
return 0;
}
注意 1 秒的限制。我认为发送大缓冲区需要超过 1 秒并且执行停止。我可以按 64k 字节顺序执行 Write()
并且它有效。但我认为如果连接无法每秒发送 64k 字节,它将失败。所以,我需要一个更好的解决方案。
I think the reason is the implementation of boost::asio::io_context::run_one_for(). There it is (boost 1.67):
没有。原因是 boost::asio::[async_][read|write][_.*]
是组合操作。 run_one[_*]
只有 运行 一个处理程序,因此它不会完成一个组合的异步操作。
另请注意,boost::asio::write
是一个同步操作,与周围的任何逻辑无关,因此您的超时甚至不会起作用。
您可以将异步操作包装在本地 io_context 中并 运行 使用截止日期完成(因此,run_for
而不是 run_one_for
)。
您可以从 How to simulate boost::asio::write with a timeout 中收集到类似的概念 - 尽管使用 run_for(...)
比 run_one()
严格(很多)更好(出于上面解释的原因。在我写另一个答案的时候,run_for()
不存在,我可能不得不更新那个答案,因为那时我可能错过了组合操作的问题。