使用 ASIO 将超时连接尝试作为组合操作
connection attempt with timout as a composed operation using ASIO
我写了一个 class 尝试与提供自定义超时和多次尝试的 TCP 服务器建立连接。它是一个 Callable 对象,return 是一个 std::future
作为结果。
我最初实施的问题是:
- 该对象必须持续存在,直到建立连接,或者它已 运行 次尝试或发生停止情况错误。所以我必须将它存储在我希望避免的 class 中。
- asio 组合操作为 return 上的控制流提供了自定义方法:CompletionToken 可能是一个简单的回调、未来,或者可以使用协程。就我而言,我已将用户绑定到未来。
这是我对具有自定义超时和尝试次数的连接尝试的初始实现:
template<typename Connection>
class connection_attempt
{
public:
using connection_type = Connection;
using endpoint_type = typename Connection::endpoint_type;
template<typename Endpoint>
using require_endpoint = typename std::enable_if<std::is_same<Endpoint, endpoint_type>::value>::type;
constexpr static auto default_timeout()
{
return std::chrono::milliseconds(3000);
}
constexpr static size_t infinite_attempts()
{
return size_t() - 1;
}
explicit connection_attempt(Connection &connection)
: connection_(connection)
{}
template<typename Callable>
explicit connection_attempt(Connection &connection,
Callable &&stopOnError)
: connection_(connection),
stopOnError_(std::forward<Callable>(stopOnError))
{}
template<typename Endpoint,
typename Duration,
typename = require_endpoint<Endpoint>>
std::future<bool> operator()(Endpoint &&endpoint,
size_t attempts,
Duration &&timeout = default_timeout())
{
connectionResult_ = {};
asyncConnect(std::forward<Endpoint>(endpoint),
attempts,
std::forward<Duration>(timeout));
return connectionResult_.get_future();
}
// default attempts = infinite_attempts
template<typename Endpoint,
typename Duration,
typename = require_endpoint<Endpoint>>
std::future<bool> operator()(Endpoint endpoint,
Duration &&timeout = default_timeout())
{
connectionResult_ = {};
asyncConnect(std::forward<Endpoint>(endpoint),
infinite_attempts(),
std::forward<Duration>(timeout));
return connectionResult_.get_future();
}
private:
connection_type &connection_;
asio::steady_timer timer_
{connection_.get_executor()}; // this does not compile -> {asio::get_associated_executor(connection_)};
std::function<bool(const asio::error_code &)> stopOnError_;
std::promise<bool> connectionResult_;
// cancels the connection on timeout!
template<typename Duration>
void startTimer(const Duration &timeout)
{
timer_.expires_after(timeout); // it will automatically cancel a pending timer
timer_.async_wait(
[this, timeout](const asio::error_code &errorCode)
{
// will occur on connection error before timeout
if (errorCode == asio::error::operation_aborted)
return;
// TODO: handle timer errors? What are the possible errors?
assert(!errorCode && "unexpected timer error!");
// stop current connection attempt
connection_.cancel();
});
}
void stopTimer()
{
timer_.cancel();
}
/**
* Will be trying to connect until:<br>
* - has run out of attempts
* - has been required to stop by stopOnError callback (if it was set)
* @param endpoint
* @param attempts
*/
template<typename Duration>
void asyncConnect(endpoint_type endpoint,
size_t attempts,
Duration &&timeout)
{
startTimer(timeout);
connection_.async_connect(endpoint, [this,
endpoint,
attempts,
timeout = std::forward<Duration>(timeout)](const asio::error_code &errorCode)
{
if (!errorCode)
{
stopTimer();
connectionResult_.set_value(true);
return;
}
const auto attemptsLeft = attempts == infinite_attempts() ?
infinite_attempts() :
attempts - 1;
if ((stopOnError_ &&
stopOnError_(errorCode == asio::error::operation_aborted ?
// special case for operation_aborted on timer expiration - need to send timed_out explicitly
// this should only be resulted from the timer calling cancel()
asio::error::timed_out :
errorCode)) ||
!attemptsLeft)
{
stopTimer();
connectionResult_.set_value(false);
return;
}
asyncConnect(endpoint,
attemptsLeft,
timeout);
});
}
};
// this should be an asynchornous function with a custom CompletionToken
template<typename Connection,
typename Callable>
auto make_connection_attempt(Connection &connection,
Callable &&stopOnError) -> connection_attempt<Connection>
{
return connection_attempt<Connection>(connection,
std::forward<Callable>(stopOnError));
}
但是,我希望使用 ASIO 和 Universal Model for Asynchronous Operations 保持一致:return 上的控制流应该是可定制的。
我已经完成了 example,使用带有状态中间处理程序的组合操作以间隔发送多条消息。处理程序递归地将自身作为每个下一个异步操作的处理程序传递:async_wait
和 async_write
。这些调用总是轮流进行:总是在另一个 returned 时调用一个。然而,在我的例子中,async_wait
和 async_connect
被同时调用:
// initiation method, called first
void operator()(args...)
{
// not valid!
timer.async_wait(std::move(*this)); // from now on this is invalid
connection.async_connect(endpoint, std::move(*this)); can't move this twice
}
这是 class 我试图作为启动和中间处理程序实现的代码:
template<typename Connection, typename CompletionToken>
class composed_connection_attempt
{
public:
using connection_type = Connection;
using endpoint_type = typename Connection::endpoint_type;
enum class state
{
pending,
connected,
timeout
};
constexpr static auto default_timeout()
{
return std::chrono::milliseconds(3000);
}
constexpr static size_t infinite_attempts()
{
return size_t() - 1;
}
// TODO: executor type
using executor_type = asio::associated_executor_t<CompletionToken,
typename connection_type::executor_type>;
executor_type get_executor() const noexcept
{
// TODO: get completion handler executor
return connection_.get_executor();
}
// TODO: allocator type
using allocator_type = typename asio::associated_allocator_t<CompletionToken,
std::allocator<void>>;
allocator_type get_allocator() const noexcept
{
// TODO: get completion handler allocator
return allocator_type();
}
// TODO: constructor to initialize state, pass timeout value?
explicit composed_connection_attempt(connection_type &connection)
: connection_(connection)
{}
template<typename Callable>
composed_connection_attempt(connection_type &connection, Callable &&stopOnError)
: connection_(connection),
stopOnError_(std::forward<Callable>(stopOnError))
{}
// operator for initiation
template<typename Endpoint, typename Duration>
void operator()(Endpoint &&endpoint,
size_t attempts,
Duration timeout = default_timeout())
{
// Start timer: how to pass this
// Attempt connection
}
// intermediate completion handler
// this may be invoked without an error both by the timer and a connection
void operator()(const asio::error_code &errorCode)
{
if (!errorCode)
{
}
}
private:
Connection &connection_;
asio::steady_timer timer_{this->get_executor()};
std::atomic<state> state_{state::pending};
std::function<bool(const asio::error_code &)> stopOnError_;
std::function<void(const asio::error_code &)> completionHandler_;
};
所以,我要解决的问题:
- 如何与计时器和连接(套接字)共享有状态中间处理程序的所有权?也许我必须使用嵌套 classes(main class 用于启动,嵌套用于定时器和套接字事件)?
- 如何确定哪个异步调用导致了
void operator()(const asio::error_code&)
调用?没有错误可能是连接成功或超时的结果。两种异步操作也可以 return asio::error::operation_aborted
取消:连接尝试在超时时取消,计时器在成功或连接错误时取消。
因此,对于第二个问题,我提出了一个有区别的论点(有时我使用一个空的“状态结构”,如 State::Init{}
或 State::Timeout{}
来帮助解决重载问题以及自我记录).
对于第一个问题,我确定您可能 运行 进入 std::enable_shared_from_this
因为。
这是我对“通用模型”的看法。为了便于说明,我使用了 spawn
。
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep,
F&& stopOn, Token&& token,
int attempts = -1,
Timer::duration delay = 3s)
{
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
asio::spawn(
object.get_executor(),
[=, &object](asio::yield_context yc) mutable {
using mylib::result_code;
auto ex = get_associated_executor(yc);
error_code ec;
while (attempts--) {
Timer t(ex, delay);
t.async_wait([&](error_code ec) { if (!ec) object.cancel(); });
object.async_connect(ep, yc[ec]);
if(!ec)
return completion(result_code::ok, true);
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (stopOn && stopOn(ec))
return completion(ec, false);
object.close();
}
return completion(result_code::attempts_exceeded, false);
});
return result.get();
}
需要注意的关键事项是:
async_result<>
协议将为您提供一个完成处理程序,该处理程序可以“执行调用者所需的魔法”(use_future、yield_context 等)
- 你应该能够在不共享引用的情况下逃脱,因为计时器可以“只是”有一个原始指针:计时器的生命周期完全由包含的组合操作拥有。
完整演示:回调、协程和期货
我加入了一个 mylib::result_code
枚举以便能够 return 完整的错误信息:
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>
#ifdef STANDALONE_ASIO
using std::error_category;
using std::error_code;
using std::error_condition;
using std::system_error;
#else
namespace asio = boost::asio;
using boost::system::error_category;
using boost::system::error_code;
using boost::system::error_condition;
using boost::system::system_error;
#endif
using namespace std::chrono_literals;
using asio::ip::tcp;
using Timer = asio::steady_timer;
namespace mylib { // threw in the kitchen sink for error codes
enum class result_code {
ok = 0,
timeout = 1,
attempts_exceeded = 2,
};
auto const& get_result_category() {
struct impl : error_category {
const char* name() const noexcept override { return "result_code"; }
std::string message(int ev) const override {
switch (static_cast<result_code>(ev)) {
case result_code::ok: return "success";
case result_code::attempts_exceeded:
return "the maximum number of attempts was exceeded";
case result_code::timeout:
return "the operation did not complete in time";
default: return "unknown error";
}
}
error_condition
default_error_condition(int ev) const noexcept override {
return error_condition{ev, *this};
}
bool equivalent(int ev, error_condition const& condition)
const noexcept override {
return condition.value() == ev && &condition.category() == this;
}
bool equivalent(error_code const& error,
int ev) const noexcept override {
return error.value() == ev && &error.category() == this;
}
} const static instance;
return instance;
}
error_code make_error_code(result_code se) {
return error_code{
static_cast<std::underlying_type<result_code>::type>(se),
get_result_category()};
}
} // namespace mylib
template <>
struct boost::system::is_error_code_enum<mylib::result_code>
: std::true_type {};
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep,
F&& stopOn, Token&& token,
int attempts = -1,
Timer::duration delay = 3s)
{
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
asio::spawn(
object.get_executor(),
[=, &object](asio::yield_context yc) mutable {
using mylib::result_code;
auto ex = get_associated_executor(yc);
error_code ec;
while (attempts--) {
Timer t(ex, delay);
t.async_wait([&](error_code ec) { if (!ec) object.cancel(); });
object.async_connect(ep, yc[ec]);
if(!ec)
return completion(result_code::ok, true);
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (stopOn && stopOn(ec))
return completion(ec, false);
object.close();
}
return completion(result_code::attempts_exceeded, false);
});
return result.get();
}
static auto non_recoverable = [](error_code ec) {
std::cerr << "Checking " << std::quoted(ec.message()) << "\n";
// TODO Be specific about intermittent/recoverable conditions
return false;
};
#include <set>
int main(int argc, char** argv) {
assert(argc>1);
static const tcp::endpoint ep{asio::ip::make_address(argv[1]),
8989};
std::set<std::string_view> const options{argv+1, argv+argc};
std::cout << std::boolalpha;
if (options.contains("future")) {
std::cout
<< "-----------------------\n"
<< " FUTURE DEMO\n"
<< "-----------------------" << std::endl;
asio::thread_pool ctx;
try {
tcp::socket s(ctx);
std::future<bool> ok = async_connection_attempt(
s, ep, non_recoverable, asio::use_future, 5, 800ms);
std::cout << "Future: " << ok.get() << ", " << s.is_open() << "\n";
} catch (system_error const& se) {
std::cout << "Future: " << se.code().message() << "\n";
}
ctx.join();
}
if (options.contains("coroutine")) {
std::cout
<< "-----------------------\n"
<< " COROUTINE DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
asio::spawn(ctx,
[work = make_work_guard(ctx)](asio::yield_context yc) {
auto ex = get_associated_executor(yc);
tcp::socket s(ex);
error_code ec;
if (async_connection_attempt(s, ep, non_recoverable,
yc[ec], 5, 800ms)) {
std::cout << "Connected in coro\n";
} else {
std::cout << "NOT Connected in coro: " << ec.message() << "\n";
}
});
ctx.run();
}
if (options.contains("callback")) {
std::cout
<< "-----------------------\n"
<< " CALLBACK DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
tcp::socket s(ctx);
async_connection_attempt(
s, ep, non_recoverable,
[](error_code ec, bool ok) {
std::cout << "Callback: " << ok << ", "
<< ec.message() << "\n";
},
5, 800ms);
ctx.run();
}
}
示例输出在在线编译器上,或者在我的机器上比较一些测试:
我就是这样实现的。可以找到带有测试的代码 here on github
template<typename Connection, typename CompletionHandler>
class composed_connection_attempt
{
public:
using connection_type = Connection;
using endpoint_type = typename Connection::endpoint_type;
// TODO: clarify the type!
using completion_handler_t = CompletionHandler;
constexpr static auto default_timeout()
{
return std::chrono::milliseconds(3000);
}
constexpr static size_t infinite_attempts()
{
return size_t() - 1;
}
using executor_type = asio::associated_executor_t<
typename std::decay<CompletionHandler>::type,
typename connection_type::executor_type>;
executor_type get_executor() const noexcept
{
// TODO: get completion handler executor
return pImpl_->get_executor();
}
// TODO: allocator type
using allocator_type = typename asio::associated_allocator_t<CompletionHandler,
std::allocator<void>>;
allocator_type get_allocator() const noexcept
{
// TODO: get completion handler allocator
return pImpl_->get_allocator();
}
// TODO: constructor to initialize state, pass timeout value?
template<typename CompletionHandlerT>
explicit composed_connection_attempt(connection_type &connection,
CompletionHandlerT &&completionHandler)
: pImpl_(std::make_shared<impl>(connection,
std::forward<CompletionHandlerT>(completionHandler)))
{}
template<typename CompletionHandlerT,
typename Callable>
explicit composed_connection_attempt(connection_type &connection,
CompletionHandlerT &&completionHandler,
Callable &&stopOnError)
: pImpl_(std::make_shared<impl>(connection,
std::forward<CompletionHandlerT>(completionHandler),
std::forward<Callable>(stopOnError)))
{}
/**
* Initiation operator. Initiates composed connection procedure.
* @tparam Endpoint type of endpoint
* @tparam Duration type of timeout
* @param endpoint endpoint to be used for connection
* @param attempts number of attempts
* @param timeout value to be used as a timeout between attempts
*/
// TODO: require endpoint type
template<typename Endpoint, typename Duration>
void operator()(Endpoint &&endpoint,
size_t attempts,
Duration &&timeout = default_timeout())
{
pImpl_->endpoint_ = std::forward<Endpoint>(endpoint);
pImpl_->attempts_ = attempts;
pImpl_->timeout_ = std::forward<Duration>(timeout);
asyncConnect();
}
/**
* Initiation operator. Initiates composed connection procedure. Connection attempts default to infinite.
* @tparam Endpoint type of endpoint
* @tparam Duration type of timeout
* @param endpoint endpoint to be used for connection
* @param timeout value to be used as a timeout between attempts
*/
// TODO: require endpoint type
template<typename Endpoint, typename Duration>
void operator()(Endpoint &&endpoint,
Duration &&timeout = default_timeout())
{
pImpl_->endpoint_ = std::forward<Endpoint>(endpoint);
pImpl_->timeout_ = std::forward<Duration>(timeout);
asyncConnect();
}
/**
* Intermediate completion handler. Will be trying to connect until:<br>
* - has connected<br>
* - has run out of attempts<br>
* - user-provided callback #impl::stopOnError_ interrupts execution when a specific connection error has occurred<br>
* <br>Will be invoked only on connection events:<br>
* - success<br>
* - connection timeout or operation_cancelled in case if timer has expired<br>
* - connection errors<br>
* @param errorCode error code resulted from async_connect
*/
void operator()(const asio::error_code &errorCode)
{
if (!errorCode)
{
stopTimer();
pImpl_->completionHandler_(errorCode);
return;
}
const auto attemptsLeft = pImpl_->attempts_ == infinite_attempts() ?
infinite_attempts() :
pImpl_->attempts_ - 1;
if ((pImpl_->stopOnError_ &&
pImpl_->stopOnError_(errorCode == asio::error::operation_aborted ?
// special case for operation_aborted on timer expiration - need to send timed_out explicitly
// this should only be resulted from the timer calling cancel()
asio::error::timed_out :
errorCode)) ||
!attemptsLeft)
{
stopTimer();
pImpl_->completionHandler_(errorCode == asio::error::operation_aborted ?
asio::error::timed_out :
errorCode);
return;
}
pImpl_->attempts_ = attemptsLeft;
asyncConnect();
}
private:
struct impl
{
template<typename CompletionHandlerT>
impl(connection_type &connection,
CompletionHandlerT &&completionHandler)
: connection_(connection),
completionHandler_(std::forward<CompletionHandlerT>(completionHandler))
{}
template<typename CompletionHandlerT, typename Callable>
impl(connection_type &connection,
CompletionHandlerT &&completionHandler,
Callable &&stopOnError)
: connection_(connection),
completionHandler_(std::forward<CompletionHandlerT>(completionHandler)),
stopOnError_(std::forward<Callable>(stopOnError))
{}
executor_type get_executor() const noexcept
{
return asio::get_associated_executor(completionHandler_,
connection_.get_executor());
}
allocator_type get_allocator() const noexcept
{
// TODO: get completion handler allocator
return allocator_type();
}
connection_type &connection_;
completion_handler_t completionHandler_;
std::function<bool(const asio::error_code &)> stopOnError_;
// this should be default constructable or should I pass it in the constructor?
endpoint_type endpoint_;
// TODO: make timer initialization from get_executor()
asio::steady_timer timer_{connection_.get_executor()}; // this does not compile! -> {get_executor()};
asio::steady_timer::duration timeout_ = default_timeout();
size_t attempts_ = infinite_attempts();
};
// TODO: make unique?
std::shared_ptr<impl> pImpl_;
// cancels the connection on timeout!
void startTimer()
{
pImpl_->timer_.expires_after(pImpl_->timeout_); // it will automatically cancel a pending timer
pImpl_->timer_.async_wait(
[pImpl = pImpl_](const asio::error_code &errorCode)
{
// will occur on connection error before timeout
if (errorCode == asio::error::operation_aborted)
return;
// TODO: handle timer errors? What are the possible errors?
assert(!errorCode && "unexpected timer error!");
// stop attempts
pImpl->connection_.cancel();
});
}
void stopTimer()
{
pImpl_->timer_.cancel();
}
/**
* Will be trying to connect until:<br>
* - has run out of attempts
* - has been required to stop by stopOnError callback (if it was set)
* @param endpoint
* @param attempts
*/
void asyncConnect()
{
startTimer();
pImpl_->connection_.async_connect(pImpl_->endpoint_, std::move(*this));
}
};
template<typename Connection,
typename CompletionHandler,
typename Callable>
auto make_composed_connection_attempt(Connection &connection,
CompletionHandler &&completionHandler,
Callable &&stopOnError) ->
composed_connection_attempt<Connection, CompletionHandler>
{
return composed_connection_attempt<Connection, CompletionHandler>(connection,
std::forward<CompletionHandler>(
completionHandler),
std::forward<Callable>(stopOnError));
}
template<typename Connection,
typename Endpoint,
typename Duration,
typename CompletionToken,
typename Callable>
auto async_connection_attempt(Connection &connection,
Endpoint &&endpoint,
size_t attempts,
Duration &&timeout,
CompletionToken &&completionToken,
Callable &&stopOnError)
{
using result_t = asio::async_result<std::decay_t<CompletionToken>,
void(asio::error_code)>;
using completion_t = typename result_t::completion_handler_type;
completion_t completion{std::forward<CompletionToken>(completionToken)};
result_t result{completion};
auto composedConnectionAttempt = make_composed_connection_attempt(connection,
std::forward<completion_t>(completion),
std::forward<Callable>(stopOnError));
composedConnectionAttempt(std::forward<Endpoint>(endpoint),
attempts,
std::forward<Duration>(timeout));
return result.get();
}
终于抽出时间来看这个了:
Wow. I've just created the same without using spawn (using an operation type that uses the State struct arguments as I mentioned). I must say the complexity if this kind of library-implementor-stuff keeps surprising me. I managed to avoid the overhead of shared_from_this though, and of course all demos still pass, so I'm pretty content. If you want I can post as an alternative answer. – sehe
启动函数
启动函数大致相同,只是不再使用spawn
(意味着用户不必选择加入 Boost Coroutine 和 Boost Context)。
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep, F&& stopOn,
Token&& token, int attempts = -1,
Timer::duration delay = 3s) {
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
using Op = mylib::connection_attempt_op<std::decay_t<F>, Completion>;
// make an owning self, to be passed along a single async call chain
auto self = std::make_unique<Op>(object, ep, std::forward<F>(stopOn), completion, attempts, delay);
(*self)(self);
return result.get();
}
现在,您会立即发现我使用了唯一所有权容器 (unique_ptr
)。我试图通过创建一个值语义操作 class 来避免动态分配,该操作以仅移动的方式封装处理程序。
但是,操作 也 拥有计时器对象,该对象需要在回调中保持引用稳定。所以,搬家不是办法。当然,我们仍然可以有一个 包含 的可移动值操作类型,只有一个 unique_ptr
用于 _timer
,但这是相同的开销并且不太通用.
如果我们将另一个 IO 对象添加到操作状态,我们将需要更动态的分配
移动一个 unique_ptr 比一个大小
的状态对象要便宜得多
在成员函数中移动 this
指向的对象 非常 容易出错。例如,这将调用未定义的行为:
bind_executor(_ex, std::bind(std::move(*this), ...))
那是因为 _ex
实际上是 this->_ex
但计算没有顺序,所以 this->_ex
可能在移动之后计算。
这是我们不应该想要的那种脚枪。
如果我们实现其他异步操作,我们可以使用相同的模式。
操作Class
您将从您的原始代码中识别出这一点。我选择使用我自己的建议来 select operator()
通过调度标记“状态”类型的重载:
struct Init {};
struct Attempt {};
struct Response {};
为了帮助绑定到我们自己,我们还将拥有的 unique_ptr 作为 self
参数传递:
using Self = std::unique_ptr<connection_attempt_op>;
struct Binder {
Self _self;
template <typename... Args>
decltype(auto) operator()(Args&&... args) {
return (*_self)(_self, std::forward<Args>(args)...);
}
};
由于 std::bind
的限制,我们无法传递实际的右值参数,但是
如果我们注意调用链是严格顺序的,我们总是可以从 self
恰好移动一次
由于 unique_ptr 的间接性,在移动 self
后从方法体中使用 this
仍然是安全的
我们现在可以在 _timer.async_wait
的完成处理程序中捕获 this
的稳定值!只要我们保证完成处理程序不会超过 self
的生命周期,我们就不必在这里共享所有权。
shared_ptr
dependence averted!
考虑到这些,我认为完整的实施没有什么惊喜:
namespace mylib { // implementation details
template <typename F, typename Completion> struct connection_attempt_op {
tcp::socket& _object;
tcp::endpoint _ep;
F _stopOn;
Completion _handler;
int _attempts;
Timer::duration _delay;
using executor_type =
asio::strand<std::decay_t<decltype(_object.get_executor())>>;
executor_type _ex;
std::unique_ptr<Timer> _timer;
executor_type const& get_executor() { return _ex; }
explicit connection_attempt_op(tcp::socket& object, tcp::endpoint ep,
F stopOn, Completion handler,
int attempts, Timer::duration delay)
: _object(object),
_ep(ep),
_stopOn(std::move(stopOn)),
_handler(std::move(handler)),
_attempts(attempts),
_delay(delay),
_ex(object.get_executor()) {}
struct Init {};
struct Attempt {};
struct Response {};
using Self = std::unique_ptr<connection_attempt_op>;
struct Binder {
Self _self;
template <typename... Args>
decltype(auto) operator()(Args&&... args) {
return (*_self)(_self, std::forward<Args>(args)...);
}
};
void operator()(Self& self, Init = {}) {
// This is the only invocation perhaps not yet on the strand, so
// dispatch
asio::dispatch(_ex, std::bind(Binder{std::move(self)}, Attempt{}));
}
void operator()(Self& self, Attempt) {
if (_attempts--) {
_timer = std::make_unique<Timer>(_ex, _delay);
_timer->async_wait([this](error_code ec) {
if (!ec) _object.cancel();
});
_object.async_connect(
_ep,
asio::bind_executor(
_ex, // _object may not already have been on strand
std::bind(Binder{std::move(self)}, Response{},
std::placeholders::_1)));
} else {
_handler(mylib::result_code::attempts_exceeded, false);
}
}
void operator()(Self& self, Response, error_code ec) {
if (!ec) {
_timer.reset();
return _handler(result_code::ok, true);
}
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (_stopOn && _stopOn(ec))
return _handler(ec, false);
_timer.reset();
_object.close();
operator()(self, Attempt{});
}
};
}
Do note the executor binding; the comment in the Init{}
overload as well as with the bind_executor
are relevant here.
The strand is essential to maintaining the lifetime guarantees that we needed w.r.t. the async_wait
operation. In particular we need the handler ordering follow this
演示时间
代码的其余部分与其他答案 100% 相同,所以让我们在没有进一步评论的情况下展示它:
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
//#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>
#ifdef STANDALONE_ASIO
using std::error_category;
using std::error_code;
using std::error_condition;
using std::system_error;
#else
namespace asio = boost::asio;
using boost::system::error_category;
using boost::system::error_code;
using boost::system::error_condition;
using boost::system::system_error;
#endif
using namespace std::chrono_literals;
using asio::ip::tcp;
using Timer = asio::steady_timer;
namespace mylib { // threw in the kitchen sink for error codes
enum class result_code {
ok = 0,
timeout = 1,
attempts_exceeded = 2,
not_implemented = 3,
};
auto const& get_result_category() {
struct impl : error_category {
const char* name() const noexcept override { return "result_code"; }
std::string message(int ev) const override {
switch (static_cast<result_code>(ev)) {
case result_code::ok: return "success";
case result_code::attempts_exceeded:
return "the maximum number of attempts was exceeded";
case result_code::timeout:
return "the operation did not complete in time";
case result_code::not_implemented:
return "feature not implemented";
default: return "unknown error";
}
}
error_condition
default_error_condition(int ev) const noexcept override {
return error_condition{ev, *this};
}
bool equivalent(int ev, error_condition const& condition)
const noexcept override {
return condition.value() == ev && &condition.category() == this;
}
bool equivalent(error_code const& error,
int ev) const noexcept override {
return error.value() == ev && &error.category() == this;
}
} const static instance;
return instance;
}
error_code make_error_code(result_code se) {
return error_code{
static_cast<std::underlying_type<result_code>::type>(se),
get_result_category()};
}
} // namespace mylib
template <>
struct boost::system::is_error_code_enum<mylib::result_code>
: std::true_type {};
namespace mylib { // implementation details
template <typename F, typename Completion> struct connection_attempt_op {
tcp::socket& _object;
tcp::endpoint _ep;
F _stopOn;
Completion _handler;
int _attempts;
Timer::duration _delay;
using executor_type =
asio::strand<std::decay_t<decltype(_object.get_executor())>>;
executor_type _ex;
std::unique_ptr<Timer> _timer;
executor_type const& get_executor() { return _ex; }
explicit connection_attempt_op(tcp::socket& object, tcp::endpoint ep,
F stopOn, Completion handler,
int attempts, Timer::duration delay)
: _object(object),
_ep(ep),
_stopOn(std::move(stopOn)),
_handler(std::move(handler)),
_attempts(attempts),
_delay(delay),
_ex(object.get_executor()) {}
struct Init {};
struct Attempt {};
struct Response {};
using Self = std::unique_ptr<connection_attempt_op>;
struct Binder {
Self _self;
template <typename... Args>
decltype(auto) operator()(Args&&... args) {
return (*_self)(_self, std::forward<Args>(args)...);
}
};
void operator()(Self& self, Init = {}) {
// This is the only invocation perhaps not yet on the strand, so
// dispatch
asio::dispatch(_ex, std::bind(Binder{std::move(self)}, Attempt{}));
}
void operator()(Self& self, Attempt) {
if (_attempts--) {
_timer = std::make_unique<Timer>(_ex, _delay);
_timer->async_wait([this](error_code ec) {
if (!ec) _object.cancel();
});
_object.async_connect(
_ep,
asio::bind_executor(
_ex, // _object may not already have been on strand
std::bind(Binder{std::move(self)}, Response{},
std::placeholders::_1)));
} else {
_handler(mylib::result_code::attempts_exceeded, false);
}
}
void operator()(Self& self, Response, error_code ec) {
if (!ec) {
_timer.reset();
return _handler(result_code::ok, true);
}
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (_stopOn && _stopOn(ec))
return _handler(ec, false);
_timer.reset();
_object.close();
operator()(self, Attempt{});
}
};
}
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep, F&& stopOn,
Token&& token, int attempts = -1,
Timer::duration delay = 3s) {
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
using Op = mylib::connection_attempt_op<std::decay_t<F>, Completion>;
// make an owning self, to be passed along a single async call chain
auto self = std::make_unique<Op>(object, ep, std::forward<F>(stopOn), completion, attempts, delay);
(*self)(self);
return result.get();
}
static auto non_recoverable = [](error_code ec) {
std::cerr << "Checking " << std::quoted(ec.message()) << "\n";
// TODO Be specific about intermittent/recoverable conditions
return false;
};
#include <set>
int main(int argc, char** argv) {
assert(argc>1);
static const tcp::endpoint ep{asio::ip::make_address(argv[1]),
8989};
std::set<std::string_view> const options{argv+1, argv+argc};
std::cout << std::boolalpha;
if (options.contains("future")) {
std::cout
<< "-----------------------\n"
<< " FUTURE DEMO\n"
<< "-----------------------" << std::endl;
asio::thread_pool ctx;
try {
tcp::socket s(ctx);
std::future<bool> ok = async_connection_attempt(
s, ep, non_recoverable, asio::use_future, 5, 800ms);
std::cout << "Future: " << ok.get() << ", " << s.is_open() << "\n";
} catch (system_error const& se) {
std::cout << "Future: " << se.code().message() << "\n";
}
ctx.join();
}
if (options.contains("coroutine")) {
std::cout
<< "-----------------------\n"
<< " COROUTINE DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
asio::spawn(ctx,
[work = make_work_guard(ctx)](asio::yield_context yc) {
auto ex = get_associated_executor(yc);
tcp::socket s(ex);
error_code ec;
if (async_connection_attempt(s, ep, non_recoverable,
yc[ec], 5, 800ms)) {
std::cout << "Connected in coro\n";
} else {
std::cout << "NOT Connected in coro: " << ec.message() << "\n";
}
});
ctx.run();
}
if (options.contains("callback")) {
std::cout
<< "-----------------------\n"
<< " CALLBACK DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
tcp::socket s(ctx);
async_connection_attempt(
s, ep, non_recoverable,
[](error_code ec, bool ok) {
std::cout << "Callback: " << ok << ", "
<< ec.message() << "\n";
},
5, 800ms);
ctx.run();
}
}
这是另一个不同场景的本地演示:
我写了一个 class 尝试与提供自定义超时和多次尝试的 TCP 服务器建立连接。它是一个 Callable 对象,return 是一个 std::future
作为结果。
我最初实施的问题是:
- 该对象必须持续存在,直到建立连接,或者它已 运行 次尝试或发生停止情况错误。所以我必须将它存储在我希望避免的 class 中。
- asio 组合操作为 return 上的控制流提供了自定义方法:CompletionToken 可能是一个简单的回调、未来,或者可以使用协程。就我而言,我已将用户绑定到未来。
这是我对具有自定义超时和尝试次数的连接尝试的初始实现:
template<typename Connection>
class connection_attempt
{
public:
using connection_type = Connection;
using endpoint_type = typename Connection::endpoint_type;
template<typename Endpoint>
using require_endpoint = typename std::enable_if<std::is_same<Endpoint, endpoint_type>::value>::type;
constexpr static auto default_timeout()
{
return std::chrono::milliseconds(3000);
}
constexpr static size_t infinite_attempts()
{
return size_t() - 1;
}
explicit connection_attempt(Connection &connection)
: connection_(connection)
{}
template<typename Callable>
explicit connection_attempt(Connection &connection,
Callable &&stopOnError)
: connection_(connection),
stopOnError_(std::forward<Callable>(stopOnError))
{}
template<typename Endpoint,
typename Duration,
typename = require_endpoint<Endpoint>>
std::future<bool> operator()(Endpoint &&endpoint,
size_t attempts,
Duration &&timeout = default_timeout())
{
connectionResult_ = {};
asyncConnect(std::forward<Endpoint>(endpoint),
attempts,
std::forward<Duration>(timeout));
return connectionResult_.get_future();
}
// default attempts = infinite_attempts
template<typename Endpoint,
typename Duration,
typename = require_endpoint<Endpoint>>
std::future<bool> operator()(Endpoint endpoint,
Duration &&timeout = default_timeout())
{
connectionResult_ = {};
asyncConnect(std::forward<Endpoint>(endpoint),
infinite_attempts(),
std::forward<Duration>(timeout));
return connectionResult_.get_future();
}
private:
connection_type &connection_;
asio::steady_timer timer_
{connection_.get_executor()}; // this does not compile -> {asio::get_associated_executor(connection_)};
std::function<bool(const asio::error_code &)> stopOnError_;
std::promise<bool> connectionResult_;
// cancels the connection on timeout!
template<typename Duration>
void startTimer(const Duration &timeout)
{
timer_.expires_after(timeout); // it will automatically cancel a pending timer
timer_.async_wait(
[this, timeout](const asio::error_code &errorCode)
{
// will occur on connection error before timeout
if (errorCode == asio::error::operation_aborted)
return;
// TODO: handle timer errors? What are the possible errors?
assert(!errorCode && "unexpected timer error!");
// stop current connection attempt
connection_.cancel();
});
}
void stopTimer()
{
timer_.cancel();
}
/**
* Will be trying to connect until:<br>
* - has run out of attempts
* - has been required to stop by stopOnError callback (if it was set)
* @param endpoint
* @param attempts
*/
template<typename Duration>
void asyncConnect(endpoint_type endpoint,
size_t attempts,
Duration &&timeout)
{
startTimer(timeout);
connection_.async_connect(endpoint, [this,
endpoint,
attempts,
timeout = std::forward<Duration>(timeout)](const asio::error_code &errorCode)
{
if (!errorCode)
{
stopTimer();
connectionResult_.set_value(true);
return;
}
const auto attemptsLeft = attempts == infinite_attempts() ?
infinite_attempts() :
attempts - 1;
if ((stopOnError_ &&
stopOnError_(errorCode == asio::error::operation_aborted ?
// special case for operation_aborted on timer expiration - need to send timed_out explicitly
// this should only be resulted from the timer calling cancel()
asio::error::timed_out :
errorCode)) ||
!attemptsLeft)
{
stopTimer();
connectionResult_.set_value(false);
return;
}
asyncConnect(endpoint,
attemptsLeft,
timeout);
});
}
};
// this should be an asynchornous function with a custom CompletionToken
template<typename Connection,
typename Callable>
auto make_connection_attempt(Connection &connection,
Callable &&stopOnError) -> connection_attempt<Connection>
{
return connection_attempt<Connection>(connection,
std::forward<Callable>(stopOnError));
}
但是,我希望使用 ASIO 和 Universal Model for Asynchronous Operations 保持一致:return 上的控制流应该是可定制的。
我已经完成了 example,使用带有状态中间处理程序的组合操作以间隔发送多条消息。处理程序递归地将自身作为每个下一个异步操作的处理程序传递:async_wait
和 async_write
。这些调用总是轮流进行:总是在另一个 returned 时调用一个。然而,在我的例子中,async_wait
和 async_connect
被同时调用:
// initiation method, called first
void operator()(args...)
{
// not valid!
timer.async_wait(std::move(*this)); // from now on this is invalid
connection.async_connect(endpoint, std::move(*this)); can't move this twice
}
这是 class 我试图作为启动和中间处理程序实现的代码:
template<typename Connection, typename CompletionToken>
class composed_connection_attempt
{
public:
using connection_type = Connection;
using endpoint_type = typename Connection::endpoint_type;
enum class state
{
pending,
connected,
timeout
};
constexpr static auto default_timeout()
{
return std::chrono::milliseconds(3000);
}
constexpr static size_t infinite_attempts()
{
return size_t() - 1;
}
// TODO: executor type
using executor_type = asio::associated_executor_t<CompletionToken,
typename connection_type::executor_type>;
executor_type get_executor() const noexcept
{
// TODO: get completion handler executor
return connection_.get_executor();
}
// TODO: allocator type
using allocator_type = typename asio::associated_allocator_t<CompletionToken,
std::allocator<void>>;
allocator_type get_allocator() const noexcept
{
// TODO: get completion handler allocator
return allocator_type();
}
// TODO: constructor to initialize state, pass timeout value?
explicit composed_connection_attempt(connection_type &connection)
: connection_(connection)
{}
template<typename Callable>
composed_connection_attempt(connection_type &connection, Callable &&stopOnError)
: connection_(connection),
stopOnError_(std::forward<Callable>(stopOnError))
{}
// operator for initiation
template<typename Endpoint, typename Duration>
void operator()(Endpoint &&endpoint,
size_t attempts,
Duration timeout = default_timeout())
{
// Start timer: how to pass this
// Attempt connection
}
// intermediate completion handler
// this may be invoked without an error both by the timer and a connection
void operator()(const asio::error_code &errorCode)
{
if (!errorCode)
{
}
}
private:
Connection &connection_;
asio::steady_timer timer_{this->get_executor()};
std::atomic<state> state_{state::pending};
std::function<bool(const asio::error_code &)> stopOnError_;
std::function<void(const asio::error_code &)> completionHandler_;
};
所以,我要解决的问题:
- 如何与计时器和连接(套接字)共享有状态中间处理程序的所有权?也许我必须使用嵌套 classes(main class 用于启动,嵌套用于定时器和套接字事件)?
- 如何确定哪个异步调用导致了
void operator()(const asio::error_code&)
调用?没有错误可能是连接成功或超时的结果。两种异步操作也可以 returnasio::error::operation_aborted
取消:连接尝试在超时时取消,计时器在成功或连接错误时取消。
因此,对于第二个问题,我提出了一个有区别的论点(有时我使用一个空的“状态结构”,如 State::Init{}
或 State::Timeout{}
来帮助解决重载问题以及自我记录).
对于第一个问题,我确定您可能 运行 进入 std::enable_shared_from_this
因为。
这是我对“通用模型”的看法。为了便于说明,我使用了 spawn
。
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep,
F&& stopOn, Token&& token,
int attempts = -1,
Timer::duration delay = 3s)
{
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
asio::spawn(
object.get_executor(),
[=, &object](asio::yield_context yc) mutable {
using mylib::result_code;
auto ex = get_associated_executor(yc);
error_code ec;
while (attempts--) {
Timer t(ex, delay);
t.async_wait([&](error_code ec) { if (!ec) object.cancel(); });
object.async_connect(ep, yc[ec]);
if(!ec)
return completion(result_code::ok, true);
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (stopOn && stopOn(ec))
return completion(ec, false);
object.close();
}
return completion(result_code::attempts_exceeded, false);
});
return result.get();
}
需要注意的关键事项是:
async_result<>
协议将为您提供一个完成处理程序,该处理程序可以“执行调用者所需的魔法”(use_future、yield_context 等)- 你应该能够在不共享引用的情况下逃脱,因为计时器可以“只是”有一个原始指针:计时器的生命周期完全由包含的组合操作拥有。
完整演示:回调、协程和期货
我加入了一个 mylib::result_code
枚举以便能够 return 完整的错误信息:
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>
#ifdef STANDALONE_ASIO
using std::error_category;
using std::error_code;
using std::error_condition;
using std::system_error;
#else
namespace asio = boost::asio;
using boost::system::error_category;
using boost::system::error_code;
using boost::system::error_condition;
using boost::system::system_error;
#endif
using namespace std::chrono_literals;
using asio::ip::tcp;
using Timer = asio::steady_timer;
namespace mylib { // threw in the kitchen sink for error codes
enum class result_code {
ok = 0,
timeout = 1,
attempts_exceeded = 2,
};
auto const& get_result_category() {
struct impl : error_category {
const char* name() const noexcept override { return "result_code"; }
std::string message(int ev) const override {
switch (static_cast<result_code>(ev)) {
case result_code::ok: return "success";
case result_code::attempts_exceeded:
return "the maximum number of attempts was exceeded";
case result_code::timeout:
return "the operation did not complete in time";
default: return "unknown error";
}
}
error_condition
default_error_condition(int ev) const noexcept override {
return error_condition{ev, *this};
}
bool equivalent(int ev, error_condition const& condition)
const noexcept override {
return condition.value() == ev && &condition.category() == this;
}
bool equivalent(error_code const& error,
int ev) const noexcept override {
return error.value() == ev && &error.category() == this;
}
} const static instance;
return instance;
}
error_code make_error_code(result_code se) {
return error_code{
static_cast<std::underlying_type<result_code>::type>(se),
get_result_category()};
}
} // namespace mylib
template <>
struct boost::system::is_error_code_enum<mylib::result_code>
: std::true_type {};
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep,
F&& stopOn, Token&& token,
int attempts = -1,
Timer::duration delay = 3s)
{
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
asio::spawn(
object.get_executor(),
[=, &object](asio::yield_context yc) mutable {
using mylib::result_code;
auto ex = get_associated_executor(yc);
error_code ec;
while (attempts--) {
Timer t(ex, delay);
t.async_wait([&](error_code ec) { if (!ec) object.cancel(); });
object.async_connect(ep, yc[ec]);
if(!ec)
return completion(result_code::ok, true);
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (stopOn && stopOn(ec))
return completion(ec, false);
object.close();
}
return completion(result_code::attempts_exceeded, false);
});
return result.get();
}
static auto non_recoverable = [](error_code ec) {
std::cerr << "Checking " << std::quoted(ec.message()) << "\n";
// TODO Be specific about intermittent/recoverable conditions
return false;
};
#include <set>
int main(int argc, char** argv) {
assert(argc>1);
static const tcp::endpoint ep{asio::ip::make_address(argv[1]),
8989};
std::set<std::string_view> const options{argv+1, argv+argc};
std::cout << std::boolalpha;
if (options.contains("future")) {
std::cout
<< "-----------------------\n"
<< " FUTURE DEMO\n"
<< "-----------------------" << std::endl;
asio::thread_pool ctx;
try {
tcp::socket s(ctx);
std::future<bool> ok = async_connection_attempt(
s, ep, non_recoverable, asio::use_future, 5, 800ms);
std::cout << "Future: " << ok.get() << ", " << s.is_open() << "\n";
} catch (system_error const& se) {
std::cout << "Future: " << se.code().message() << "\n";
}
ctx.join();
}
if (options.contains("coroutine")) {
std::cout
<< "-----------------------\n"
<< " COROUTINE DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
asio::spawn(ctx,
[work = make_work_guard(ctx)](asio::yield_context yc) {
auto ex = get_associated_executor(yc);
tcp::socket s(ex);
error_code ec;
if (async_connection_attempt(s, ep, non_recoverable,
yc[ec], 5, 800ms)) {
std::cout << "Connected in coro\n";
} else {
std::cout << "NOT Connected in coro: " << ec.message() << "\n";
}
});
ctx.run();
}
if (options.contains("callback")) {
std::cout
<< "-----------------------\n"
<< " CALLBACK DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
tcp::socket s(ctx);
async_connection_attempt(
s, ep, non_recoverable,
[](error_code ec, bool ok) {
std::cout << "Callback: " << ok << ", "
<< ec.message() << "\n";
},
5, 800ms);
ctx.run();
}
}
示例输出在在线编译器上,或者在我的机器上比较一些测试:
我就是这样实现的。可以找到带有测试的代码 here on github
template<typename Connection, typename CompletionHandler>
class composed_connection_attempt
{
public:
using connection_type = Connection;
using endpoint_type = typename Connection::endpoint_type;
// TODO: clarify the type!
using completion_handler_t = CompletionHandler;
constexpr static auto default_timeout()
{
return std::chrono::milliseconds(3000);
}
constexpr static size_t infinite_attempts()
{
return size_t() - 1;
}
using executor_type = asio::associated_executor_t<
typename std::decay<CompletionHandler>::type,
typename connection_type::executor_type>;
executor_type get_executor() const noexcept
{
// TODO: get completion handler executor
return pImpl_->get_executor();
}
// TODO: allocator type
using allocator_type = typename asio::associated_allocator_t<CompletionHandler,
std::allocator<void>>;
allocator_type get_allocator() const noexcept
{
// TODO: get completion handler allocator
return pImpl_->get_allocator();
}
// TODO: constructor to initialize state, pass timeout value?
template<typename CompletionHandlerT>
explicit composed_connection_attempt(connection_type &connection,
CompletionHandlerT &&completionHandler)
: pImpl_(std::make_shared<impl>(connection,
std::forward<CompletionHandlerT>(completionHandler)))
{}
template<typename CompletionHandlerT,
typename Callable>
explicit composed_connection_attempt(connection_type &connection,
CompletionHandlerT &&completionHandler,
Callable &&stopOnError)
: pImpl_(std::make_shared<impl>(connection,
std::forward<CompletionHandlerT>(completionHandler),
std::forward<Callable>(stopOnError)))
{}
/**
* Initiation operator. Initiates composed connection procedure.
* @tparam Endpoint type of endpoint
* @tparam Duration type of timeout
* @param endpoint endpoint to be used for connection
* @param attempts number of attempts
* @param timeout value to be used as a timeout between attempts
*/
// TODO: require endpoint type
template<typename Endpoint, typename Duration>
void operator()(Endpoint &&endpoint,
size_t attempts,
Duration &&timeout = default_timeout())
{
pImpl_->endpoint_ = std::forward<Endpoint>(endpoint);
pImpl_->attempts_ = attempts;
pImpl_->timeout_ = std::forward<Duration>(timeout);
asyncConnect();
}
/**
* Initiation operator. Initiates composed connection procedure. Connection attempts default to infinite.
* @tparam Endpoint type of endpoint
* @tparam Duration type of timeout
* @param endpoint endpoint to be used for connection
* @param timeout value to be used as a timeout between attempts
*/
// TODO: require endpoint type
template<typename Endpoint, typename Duration>
void operator()(Endpoint &&endpoint,
Duration &&timeout = default_timeout())
{
pImpl_->endpoint_ = std::forward<Endpoint>(endpoint);
pImpl_->timeout_ = std::forward<Duration>(timeout);
asyncConnect();
}
/**
* Intermediate completion handler. Will be trying to connect until:<br>
* - has connected<br>
* - has run out of attempts<br>
* - user-provided callback #impl::stopOnError_ interrupts execution when a specific connection error has occurred<br>
* <br>Will be invoked only on connection events:<br>
* - success<br>
* - connection timeout or operation_cancelled in case if timer has expired<br>
* - connection errors<br>
* @param errorCode error code resulted from async_connect
*/
void operator()(const asio::error_code &errorCode)
{
if (!errorCode)
{
stopTimer();
pImpl_->completionHandler_(errorCode);
return;
}
const auto attemptsLeft = pImpl_->attempts_ == infinite_attempts() ?
infinite_attempts() :
pImpl_->attempts_ - 1;
if ((pImpl_->stopOnError_ &&
pImpl_->stopOnError_(errorCode == asio::error::operation_aborted ?
// special case for operation_aborted on timer expiration - need to send timed_out explicitly
// this should only be resulted from the timer calling cancel()
asio::error::timed_out :
errorCode)) ||
!attemptsLeft)
{
stopTimer();
pImpl_->completionHandler_(errorCode == asio::error::operation_aborted ?
asio::error::timed_out :
errorCode);
return;
}
pImpl_->attempts_ = attemptsLeft;
asyncConnect();
}
private:
struct impl
{
template<typename CompletionHandlerT>
impl(connection_type &connection,
CompletionHandlerT &&completionHandler)
: connection_(connection),
completionHandler_(std::forward<CompletionHandlerT>(completionHandler))
{}
template<typename CompletionHandlerT, typename Callable>
impl(connection_type &connection,
CompletionHandlerT &&completionHandler,
Callable &&stopOnError)
: connection_(connection),
completionHandler_(std::forward<CompletionHandlerT>(completionHandler)),
stopOnError_(std::forward<Callable>(stopOnError))
{}
executor_type get_executor() const noexcept
{
return asio::get_associated_executor(completionHandler_,
connection_.get_executor());
}
allocator_type get_allocator() const noexcept
{
// TODO: get completion handler allocator
return allocator_type();
}
connection_type &connection_;
completion_handler_t completionHandler_;
std::function<bool(const asio::error_code &)> stopOnError_;
// this should be default constructable or should I pass it in the constructor?
endpoint_type endpoint_;
// TODO: make timer initialization from get_executor()
asio::steady_timer timer_{connection_.get_executor()}; // this does not compile! -> {get_executor()};
asio::steady_timer::duration timeout_ = default_timeout();
size_t attempts_ = infinite_attempts();
};
// TODO: make unique?
std::shared_ptr<impl> pImpl_;
// cancels the connection on timeout!
void startTimer()
{
pImpl_->timer_.expires_after(pImpl_->timeout_); // it will automatically cancel a pending timer
pImpl_->timer_.async_wait(
[pImpl = pImpl_](const asio::error_code &errorCode)
{
// will occur on connection error before timeout
if (errorCode == asio::error::operation_aborted)
return;
// TODO: handle timer errors? What are the possible errors?
assert(!errorCode && "unexpected timer error!");
// stop attempts
pImpl->connection_.cancel();
});
}
void stopTimer()
{
pImpl_->timer_.cancel();
}
/**
* Will be trying to connect until:<br>
* - has run out of attempts
* - has been required to stop by stopOnError callback (if it was set)
* @param endpoint
* @param attempts
*/
void asyncConnect()
{
startTimer();
pImpl_->connection_.async_connect(pImpl_->endpoint_, std::move(*this));
}
};
template<typename Connection,
typename CompletionHandler,
typename Callable>
auto make_composed_connection_attempt(Connection &connection,
CompletionHandler &&completionHandler,
Callable &&stopOnError) ->
composed_connection_attempt<Connection, CompletionHandler>
{
return composed_connection_attempt<Connection, CompletionHandler>(connection,
std::forward<CompletionHandler>(
completionHandler),
std::forward<Callable>(stopOnError));
}
template<typename Connection,
typename Endpoint,
typename Duration,
typename CompletionToken,
typename Callable>
auto async_connection_attempt(Connection &connection,
Endpoint &&endpoint,
size_t attempts,
Duration &&timeout,
CompletionToken &&completionToken,
Callable &&stopOnError)
{
using result_t = asio::async_result<std::decay_t<CompletionToken>,
void(asio::error_code)>;
using completion_t = typename result_t::completion_handler_type;
completion_t completion{std::forward<CompletionToken>(completionToken)};
result_t result{completion};
auto composedConnectionAttempt = make_composed_connection_attempt(connection,
std::forward<completion_t>(completion),
std::forward<Callable>(stopOnError));
composedConnectionAttempt(std::forward<Endpoint>(endpoint),
attempts,
std::forward<Duration>(timeout));
return result.get();
}
终于抽出时间来看这个了:
Wow. I've just created the same without using spawn (using an operation type that uses the State struct arguments as I mentioned). I must say the complexity if this kind of library-implementor-stuff keeps surprising me. I managed to avoid the overhead of shared_from_this though, and of course all demos still pass, so I'm pretty content. If you want I can post as an alternative answer. – sehe
启动函数
启动函数大致相同,只是不再使用spawn
(意味着用户不必选择加入 Boost Coroutine 和 Boost Context)。
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep, F&& stopOn,
Token&& token, int attempts = -1,
Timer::duration delay = 3s) {
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
using Op = mylib::connection_attempt_op<std::decay_t<F>, Completion>;
// make an owning self, to be passed along a single async call chain
auto self = std::make_unique<Op>(object, ep, std::forward<F>(stopOn), completion, attempts, delay);
(*self)(self);
return result.get();
}
现在,您会立即发现我使用了唯一所有权容器 (unique_ptr
)。我试图通过创建一个值语义操作 class 来避免动态分配,该操作以仅移动的方式封装处理程序。
但是,操作 也 拥有计时器对象,该对象需要在回调中保持引用稳定。所以,搬家不是办法。当然,我们仍然可以有一个 包含 的可移动值操作类型,只有一个 unique_ptr
用于 _timer
,但这是相同的开销并且不太通用.
如果我们将另一个 IO 对象添加到操作状态,我们将需要更动态的分配
移动一个 unique_ptr 比一个大小
的状态对象要便宜得多在成员函数中移动
this
指向的对象 非常 容易出错。例如,这将调用未定义的行为:bind_executor(_ex, std::bind(std::move(*this), ...))
那是因为
_ex
实际上是this->_ex
但计算没有顺序,所以this->_ex
可能在移动之后计算。这是我们不应该想要的那种脚枪。
如果我们实现其他异步操作,我们可以使用相同的模式。
操作Class
您将从您的原始代码中识别出这一点。我选择使用我自己的建议来 select operator()
通过调度标记“状态”类型的重载:
struct Init {};
struct Attempt {};
struct Response {};
为了帮助绑定到我们自己,我们还将拥有的 unique_ptr 作为 self
参数传递:
using Self = std::unique_ptr<connection_attempt_op>;
struct Binder {
Self _self;
template <typename... Args>
decltype(auto) operator()(Args&&... args) {
return (*_self)(_self, std::forward<Args>(args)...);
}
};
由于 std::bind
的限制,我们无法传递实际的右值参数,但是
如果我们注意调用链是严格顺序的,我们总是可以从
self
恰好移动一次由于 unique_ptr 的间接性,在移动
后从方法体中使用self
this
仍然是安全的我们现在可以在
_timer.async_wait
的完成处理程序中捕获this
的稳定值!只要我们保证完成处理程序不会超过self
的生命周期,我们就不必在这里共享所有权。shared_ptr
dependence averted!
考虑到这些,我认为完整的实施没有什么惊喜:
namespace mylib { // implementation details
template <typename F, typename Completion> struct connection_attempt_op {
tcp::socket& _object;
tcp::endpoint _ep;
F _stopOn;
Completion _handler;
int _attempts;
Timer::duration _delay;
using executor_type =
asio::strand<std::decay_t<decltype(_object.get_executor())>>;
executor_type _ex;
std::unique_ptr<Timer> _timer;
executor_type const& get_executor() { return _ex; }
explicit connection_attempt_op(tcp::socket& object, tcp::endpoint ep,
F stopOn, Completion handler,
int attempts, Timer::duration delay)
: _object(object),
_ep(ep),
_stopOn(std::move(stopOn)),
_handler(std::move(handler)),
_attempts(attempts),
_delay(delay),
_ex(object.get_executor()) {}
struct Init {};
struct Attempt {};
struct Response {};
using Self = std::unique_ptr<connection_attempt_op>;
struct Binder {
Self _self;
template <typename... Args>
decltype(auto) operator()(Args&&... args) {
return (*_self)(_self, std::forward<Args>(args)...);
}
};
void operator()(Self& self, Init = {}) {
// This is the only invocation perhaps not yet on the strand, so
// dispatch
asio::dispatch(_ex, std::bind(Binder{std::move(self)}, Attempt{}));
}
void operator()(Self& self, Attempt) {
if (_attempts--) {
_timer = std::make_unique<Timer>(_ex, _delay);
_timer->async_wait([this](error_code ec) {
if (!ec) _object.cancel();
});
_object.async_connect(
_ep,
asio::bind_executor(
_ex, // _object may not already have been on strand
std::bind(Binder{std::move(self)}, Response{},
std::placeholders::_1)));
} else {
_handler(mylib::result_code::attempts_exceeded, false);
}
}
void operator()(Self& self, Response, error_code ec) {
if (!ec) {
_timer.reset();
return _handler(result_code::ok, true);
}
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (_stopOn && _stopOn(ec))
return _handler(ec, false);
_timer.reset();
_object.close();
operator()(self, Attempt{});
}
};
}
Do note the executor binding; the comment in the
Init{}
overload as well as with thebind_executor
are relevant here.The strand is essential to maintaining the lifetime guarantees that we needed w.r.t. the
async_wait
operation. In particular we need the handler ordering follow this
演示时间
代码的其余部分与其他答案 100% 相同,所以让我们在没有进一步评论的情况下展示它:
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
//#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>
#ifdef STANDALONE_ASIO
using std::error_category;
using std::error_code;
using std::error_condition;
using std::system_error;
#else
namespace asio = boost::asio;
using boost::system::error_category;
using boost::system::error_code;
using boost::system::error_condition;
using boost::system::system_error;
#endif
using namespace std::chrono_literals;
using asio::ip::tcp;
using Timer = asio::steady_timer;
namespace mylib { // threw in the kitchen sink for error codes
enum class result_code {
ok = 0,
timeout = 1,
attempts_exceeded = 2,
not_implemented = 3,
};
auto const& get_result_category() {
struct impl : error_category {
const char* name() const noexcept override { return "result_code"; }
std::string message(int ev) const override {
switch (static_cast<result_code>(ev)) {
case result_code::ok: return "success";
case result_code::attempts_exceeded:
return "the maximum number of attempts was exceeded";
case result_code::timeout:
return "the operation did not complete in time";
case result_code::not_implemented:
return "feature not implemented";
default: return "unknown error";
}
}
error_condition
default_error_condition(int ev) const noexcept override {
return error_condition{ev, *this};
}
bool equivalent(int ev, error_condition const& condition)
const noexcept override {
return condition.value() == ev && &condition.category() == this;
}
bool equivalent(error_code const& error,
int ev) const noexcept override {
return error.value() == ev && &error.category() == this;
}
} const static instance;
return instance;
}
error_code make_error_code(result_code se) {
return error_code{
static_cast<std::underlying_type<result_code>::type>(se),
get_result_category()};
}
} // namespace mylib
template <>
struct boost::system::is_error_code_enum<mylib::result_code>
: std::true_type {};
namespace mylib { // implementation details
template <typename F, typename Completion> struct connection_attempt_op {
tcp::socket& _object;
tcp::endpoint _ep;
F _stopOn;
Completion _handler;
int _attempts;
Timer::duration _delay;
using executor_type =
asio::strand<std::decay_t<decltype(_object.get_executor())>>;
executor_type _ex;
std::unique_ptr<Timer> _timer;
executor_type const& get_executor() { return _ex; }
explicit connection_attempt_op(tcp::socket& object, tcp::endpoint ep,
F stopOn, Completion handler,
int attempts, Timer::duration delay)
: _object(object),
_ep(ep),
_stopOn(std::move(stopOn)),
_handler(std::move(handler)),
_attempts(attempts),
_delay(delay),
_ex(object.get_executor()) {}
struct Init {};
struct Attempt {};
struct Response {};
using Self = std::unique_ptr<connection_attempt_op>;
struct Binder {
Self _self;
template <typename... Args>
decltype(auto) operator()(Args&&... args) {
return (*_self)(_self, std::forward<Args>(args)...);
}
};
void operator()(Self& self, Init = {}) {
// This is the only invocation perhaps not yet on the strand, so
// dispatch
asio::dispatch(_ex, std::bind(Binder{std::move(self)}, Attempt{}));
}
void operator()(Self& self, Attempt) {
if (_attempts--) {
_timer = std::make_unique<Timer>(_ex, _delay);
_timer->async_wait([this](error_code ec) {
if (!ec) _object.cancel();
});
_object.async_connect(
_ep,
asio::bind_executor(
_ex, // _object may not already have been on strand
std::bind(Binder{std::move(self)}, Response{},
std::placeholders::_1)));
} else {
_handler(mylib::result_code::attempts_exceeded, false);
}
}
void operator()(Self& self, Response, error_code ec) {
if (!ec) {
_timer.reset();
return _handler(result_code::ok, true);
}
if (ec == asio::error::operation_aborted) {
ec = result_code::timeout;
}
if (_stopOn && _stopOn(ec))
return _handler(ec, false);
_timer.reset();
_object.close();
operator()(self, Attempt{});
}
};
}
template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep, F&& stopOn,
Token&& token, int attempts = -1,
Timer::duration delay = 3s) {
using Result = asio::async_result<std::decay_t<Token>,
void(error_code, bool)>;
using Completion = typename Result::completion_handler_type;
Completion completion(std::forward<Token>(token));
Result result(completion);
using Op = mylib::connection_attempt_op<std::decay_t<F>, Completion>;
// make an owning self, to be passed along a single async call chain
auto self = std::make_unique<Op>(object, ep, std::forward<F>(stopOn), completion, attempts, delay);
(*self)(self);
return result.get();
}
static auto non_recoverable = [](error_code ec) {
std::cerr << "Checking " << std::quoted(ec.message()) << "\n";
// TODO Be specific about intermittent/recoverable conditions
return false;
};
#include <set>
int main(int argc, char** argv) {
assert(argc>1);
static const tcp::endpoint ep{asio::ip::make_address(argv[1]),
8989};
std::set<std::string_view> const options{argv+1, argv+argc};
std::cout << std::boolalpha;
if (options.contains("future")) {
std::cout
<< "-----------------------\n"
<< " FUTURE DEMO\n"
<< "-----------------------" << std::endl;
asio::thread_pool ctx;
try {
tcp::socket s(ctx);
std::future<bool> ok = async_connection_attempt(
s, ep, non_recoverable, asio::use_future, 5, 800ms);
std::cout << "Future: " << ok.get() << ", " << s.is_open() << "\n";
} catch (system_error const& se) {
std::cout << "Future: " << se.code().message() << "\n";
}
ctx.join();
}
if (options.contains("coroutine")) {
std::cout
<< "-----------------------\n"
<< " COROUTINE DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
asio::spawn(ctx,
[work = make_work_guard(ctx)](asio::yield_context yc) {
auto ex = get_associated_executor(yc);
tcp::socket s(ex);
error_code ec;
if (async_connection_attempt(s, ep, non_recoverable,
yc[ec], 5, 800ms)) {
std::cout << "Connected in coro\n";
} else {
std::cout << "NOT Connected in coro: " << ec.message() << "\n";
}
});
ctx.run();
}
if (options.contains("callback")) {
std::cout
<< "-----------------------\n"
<< " CALLBACK DEMO\n"
<< "-----------------------" << std::endl;
asio::io_context ctx;
tcp::socket s(ctx);
async_connection_attempt(
s, ep, non_recoverable,
[](error_code ec, bool ok) {
std::cout << "Callback: " << ok << ", "
<< ec.message() << "\n";
},
5, 800ms);
ctx.run();
}
}
这是另一个不同场景的本地演示: