使用 ASIO 将超时连接尝试作为组合操作

connection attempt with timout as a composed operation using ASIO

我写了一个 class 尝试与提供自定义超时和多次尝试的 TCP 服务器建立连接。它是一个 Callable 对象,return 是一个 std::future 作为结果。 我最初实施的问题是:

这是我对具有自定义超时和尝试次数的连接尝试的初始实现:

    template<typename Connection>
    class connection_attempt
    {
    public:
        using connection_type = Connection;
        using endpoint_type = typename Connection::endpoint_type;

        template<typename Endpoint>
        using require_endpoint = typename std::enable_if<std::is_same<Endpoint, endpoint_type>::value>::type;

        constexpr static auto default_timeout()
        {
            return std::chrono::milliseconds(3000);
        }

        constexpr static size_t infinite_attempts()
        {
            return size_t() - 1;
        }

        explicit connection_attempt(Connection &connection)
                : connection_(connection)
        {}

        template<typename Callable>
        explicit connection_attempt(Connection &connection,
                                    Callable &&stopOnError)
                : connection_(connection),
                  stopOnError_(std::forward<Callable>(stopOnError))
        {}

        template<typename Endpoint,
                typename Duration,
                typename = require_endpoint<Endpoint>>
        std::future<bool> operator()(Endpoint &&endpoint,
                                     size_t attempts,
                                     Duration &&timeout = default_timeout())
        {
            connectionResult_ = {};
            asyncConnect(std::forward<Endpoint>(endpoint),
                         attempts,
                         std::forward<Duration>(timeout));
            return connectionResult_.get_future();
        }

        // default attempts = infinite_attempts
        template<typename Endpoint,
                typename Duration,
                typename = require_endpoint<Endpoint>>
        std::future<bool> operator()(Endpoint endpoint,
                                     Duration &&timeout = default_timeout())
        {
            connectionResult_ = {};
            asyncConnect(std::forward<Endpoint>(endpoint),
                         infinite_attempts(),
                         std::forward<Duration>(timeout));
            return connectionResult_.get_future();
        }

    private:
        connection_type &connection_;
        asio::steady_timer timer_
                {connection_.get_executor()}; // this does not compile -> {asio::get_associated_executor(connection_)};

        std::function<bool(const asio::error_code &)> stopOnError_;
        std::promise<bool> connectionResult_;

        // cancels the connection on timeout!
        template<typename Duration>
        void startTimer(const Duration &timeout)
        {
            timer_.expires_after(timeout); // it will automatically cancel a pending timer

            timer_.async_wait(
                    [this, timeout](const asio::error_code &errorCode)
                    {
                        // will occur on connection error before timeout
                        if (errorCode == asio::error::operation_aborted)
                            return;

                        // TODO: handle timer errors? What are the possible errors?
                        assert(!errorCode && "unexpected timer error!");

                        // stop current connection attempt
                        connection_.cancel();
                    });
        }

        void stopTimer()
        {
            timer_.cancel();
        }

        /**
         * Will be trying to connect until:<br>
         * - has run out of attempts
         * - has been required to stop by stopOnError callback (if it was set)
         * @param endpoint
         * @param attempts
         */
        template<typename Duration>
        void asyncConnect(endpoint_type endpoint,
                          size_t attempts,
                          Duration &&timeout)
        {
            startTimer(timeout);

            connection_.async_connect(endpoint, [this,
                    endpoint,
                    attempts,
                    timeout = std::forward<Duration>(timeout)](const asio::error_code &errorCode)
            {
                if (!errorCode)
                {
                    stopTimer();
                    connectionResult_.set_value(true);
                    return;
                }
                
                const auto attemptsLeft = attempts == infinite_attempts() ?
                                          infinite_attempts() :
                                          attempts - 1;

                if ((stopOnError_ &&
                     stopOnError_(errorCode == asio::error::operation_aborted ?
                                  // special case for operation_aborted on timer expiration - need to send timed_out explicitly
                                  // this should only be resulted from the timer calling cancel()
                                  asio::error::timed_out :
                                  errorCode)) ||
                    !attemptsLeft)
                {
                    stopTimer();
                    connectionResult_.set_value(false);
                    return;
                }

                asyncConnect(endpoint,
                             attemptsLeft,
                             timeout);
            });
        }
    };

    // this should be an asynchornous function with a custom CompletionToken
    template<typename Connection,
            typename Callable>
    auto make_connection_attempt(Connection &connection,
                                 Callable &&stopOnError) -> connection_attempt<Connection>
    {
        return connection_attempt<Connection>(connection,
                                              std::forward<Callable>(stopOnError));
    }

但是,我希望使用 ASIO 和 Universal Model for Asynchronous Operations 保持一致:return 上的控制流应该是可定制的。 我已经完成了 example,使用带有状态中间处理程序的组合操作以间隔发送多条消息。处理程序递归地将自身作为每个下一个异步操作的处理程序传递:async_waitasync_write。这些调用总是轮流进行:总是在另一个 returned 时调用一个。然而,在我的例子中,async_waitasync_connect 被同时调用:

// initiation method, called first
void operator()(args...)
{
    // not valid!
    timer.async_wait(std::move(*this)); // from now on this is invalid
    connection.async_connect(endpoint, std::move(*this)); can't move this twice
}

这是 class 我试图作为启动和中间处理程序实现的代码:

template<typename Connection, typename CompletionToken>
    class composed_connection_attempt
    {
    public:
        using connection_type = Connection;
        using endpoint_type = typename Connection::endpoint_type;

        enum class state
        {
            pending,
            connected,
            timeout
        };

        constexpr static auto default_timeout()
        {
            return std::chrono::milliseconds(3000);
        }

        constexpr static size_t infinite_attempts()
        {
            return size_t() - 1;
        }

        // TODO: executor type
        using executor_type = asio::associated_executor_t<CompletionToken,
                typename connection_type::executor_type>;

        executor_type get_executor() const noexcept
        {
            // TODO: get completion handler executor
            return connection_.get_executor();
        }

        // TODO: allocator type
        using allocator_type = typename asio::associated_allocator_t<CompletionToken,
                std::allocator<void>>;

        allocator_type get_allocator() const noexcept
        {
            // TODO: get completion handler allocator
            return allocator_type();
        }

        // TODO: constructor to initialize state, pass timeout value?
        explicit composed_connection_attempt(connection_type &connection)
                : connection_(connection)
        {}

        template<typename Callable>
        composed_connection_attempt(connection_type &connection, Callable &&stopOnError)
                : connection_(connection),
                  stopOnError_(std::forward<Callable>(stopOnError))
        {}

        // operator for initiation
        template<typename Endpoint, typename Duration>
        void operator()(Endpoint &&endpoint,
                        size_t attempts,
                        Duration timeout = default_timeout())
        {
            // Start timer: how to pass this
            // Attempt connection
        }

        // intermediate completion handler
        // this may be invoked without an error both by the timer and a connection
        void operator()(const asio::error_code &errorCode)
        {
            if (!errorCode)
            {

            }
        }


    private:
        Connection &connection_;
        asio::steady_timer timer_{this->get_executor()};
        std::atomic<state> state_{state::pending};
        std::function<bool(const asio::error_code &)> stopOnError_;
        std::function<void(const asio::error_code &)> completionHandler_;
    };

所以,我要解决的问题:

  1. 如何与计时器和连接(套接字)共享有状态中间处理程序的所有权?也许我必须使用嵌套 classes(main class 用于启动,嵌套用于定时器和套接字事件)?
  2. 如何确定哪个异步调用导致了 void operator()(const asio::error_code&) 调用?没有错误可能是连接成功或超时的结果。两种异步操作也可以 return asio::error::operation_aborted 取消:连接尝试在超时时取消,计时器在成功或连接错误时取消。

因此,对于第二个问题,我提出了一个有区别的论点(有时我使用一个空的“状态结构”,如 State::Init{}State::Timeout{} 来帮助解决重载问题以及自我记录).

对于第一个问题,我确定您可能 运行 进入 std::enable_shared_from_this 因为。

这是我对“通用模型”的看法。为了便于说明,我使用了 spawn

template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep,
                              F&& stopOn, Token&& token,
                              int attempts = -1,
                              Timer::duration delay = 3s)
{
    using Result = asio::async_result<std::decay_t<Token>,
                                      void(error_code, bool)>;
    using Completion = typename Result::completion_handler_type;

    Completion completion(std::forward<Token>(token));
    Result     result(completion);

    asio::spawn(
        object.get_executor(),
        [=, &object](asio::yield_context yc) mutable {
            using mylib::result_code;
            auto ex = get_associated_executor(yc);
            error_code ec;

            while (attempts--) {
                Timer t(ex, delay);
                t.async_wait([&](error_code ec) { if (!ec) object.cancel(); });

                object.async_connect(ep, yc[ec]);

                if(!ec)
                    return completion(result_code::ok, true);

                if (ec == asio::error::operation_aborted) {
                    ec = result_code::timeout;
                }

                if (stopOn && stopOn(ec))
                    return completion(ec, false);

                object.close();
            }

            return completion(result_code::attempts_exceeded, false);
        });

    return result.get();
}

需要注意的关键事项是:

  • async_result<> 协议将为您提供一个完成处理程序,该处理程序可以“执行调用者所需的魔法”(use_future、yield_context 等)
  • 你应该能够在不共享引用的情况下逃脱,因为计时器可以“只是”有一个原始指针:计时器的生命周期完全由包含的组合操作拥有。

完整演示:回调、协程和期货

我加入了一个 mylib::result_code 枚举以便能够 return 完整的错误信息:

Live On Wandbox

//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>

#ifdef STANDALONE_ASIO
    using std::error_category;
    using std::error_code;
    using std::error_condition;
    using std::system_error;
#else
    namespace asio = boost::asio;
    using boost::system::error_category;
    using boost::system::error_code;
    using boost::system::error_condition;
    using boost::system::system_error;
#endif

using namespace std::chrono_literals;
using asio::ip::tcp;
using Timer = asio::steady_timer;

namespace mylib { // threw in the kitchen sink for error codes
    enum class result_code {
        ok                = 0,
        timeout           = 1,
        attempts_exceeded = 2,
    };

    auto const& get_result_category() {
        struct impl : error_category {
            const char* name() const noexcept override { return "result_code"; }
            std::string message(int ev) const override {
                switch (static_cast<result_code>(ev)) {
                case result_code::ok: return "success";
                case result_code::attempts_exceeded:
                    return "the maximum number of attempts was exceeded";
                case result_code::timeout:
                    return "the operation did not complete in time";
                default: return "unknown error";
                }
            }
            error_condition
            default_error_condition(int ev) const noexcept override {
                return error_condition{ev, *this};
            }
            bool equivalent(int ev, error_condition const& condition)
                const noexcept override {
                return condition.value() == ev && &condition.category() == this;
            }
            bool equivalent(error_code const& error,
                            int ev) const noexcept override {
                return error.value() == ev && &error.category() == this;
            }
        } const static instance;
        return instance;
    }

    error_code make_error_code(result_code se) {
        return error_code{
            static_cast<std::underlying_type<result_code>::type>(se),
            get_result_category()};
    }
} // namespace mylib

template <>
struct boost::system::is_error_code_enum<mylib::result_code>
    : std::true_type {};

template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep,
                              F&& stopOn, Token&& token,
                              int attempts = -1,
                              Timer::duration delay = 3s)
{
    using Result = asio::async_result<std::decay_t<Token>,
                                      void(error_code, bool)>;
    using Completion = typename Result::completion_handler_type;

    Completion completion(std::forward<Token>(token));
    Result     result(completion);

    asio::spawn(
        object.get_executor(),
        [=, &object](asio::yield_context yc) mutable {
            using mylib::result_code;
            auto ex = get_associated_executor(yc);
            error_code ec;

            while (attempts--) {
                Timer t(ex, delay);
                t.async_wait([&](error_code ec) { if (!ec) object.cancel(); });

                object.async_connect(ep, yc[ec]);

                if(!ec)
                    return completion(result_code::ok, true);

                if (ec == asio::error::operation_aborted) {
                    ec = result_code::timeout;
                }

                if (stopOn && stopOn(ec))
                    return completion(ec, false);

                object.close();
            }

            return completion(result_code::attempts_exceeded, false);
        });

    return result.get();
}

static auto non_recoverable = [](error_code ec) {
    std::cerr << "Checking " << std::quoted(ec.message()) << "\n";
    // TODO Be specific about intermittent/recoverable conditions
    return false;
};

#include <set>
int main(int argc, char** argv) {
    assert(argc>1);
    static const tcp::endpoint ep{asio::ip::make_address(argv[1]),
                                  8989};

    std::set<std::string_view> const options{argv+1, argv+argc};

    std::cout << std::boolalpha;

    if (options.contains("future")) {
        std::cout
            << "-----------------------\n"
            << " FUTURE DEMO\n"
            << "-----------------------" << std::endl;
        asio::thread_pool ctx;

        try {
            tcp::socket s(ctx);

            std::future<bool> ok = async_connection_attempt(
                    s, ep, non_recoverable, asio::use_future, 5, 800ms);

            std::cout << "Future: " << ok.get() << ", " << s.is_open() << "\n";
        } catch (system_error const& se) {
            std::cout << "Future: " << se.code().message() << "\n";
        }

        ctx.join();
    }

    if (options.contains("coroutine")) {
        std::cout
            << "-----------------------\n"
            << " COROUTINE DEMO\n"
            << "-----------------------" << std::endl;
        asio::io_context ctx;

        asio::spawn(ctx,
            [work = make_work_guard(ctx)](asio::yield_context yc) {
                auto ex = get_associated_executor(yc);
                tcp::socket s(ex);

                error_code ec;
                if (async_connection_attempt(s, ep, non_recoverable,
                                             yc[ec], 5, 800ms)) {
                    std::cout << "Connected in coro\n";
                } else {
                    std::cout << "NOT Connected in coro: " << ec.message() << "\n";
                }
            });

        ctx.run();
    }

    if (options.contains("callback")) {
        std::cout
            << "-----------------------\n"
            << " CALLBACK DEMO\n"
            << "-----------------------" << std::endl;
        asio::io_context ctx;
        tcp::socket s(ctx);

        async_connection_attempt(
            s, ep, non_recoverable,
            [](error_code ec, bool ok) {
                std::cout << "Callback: " << ok << ", "
                          << ec.message() << "\n";
            },
            5, 800ms);

        ctx.run();
    }
}

示例输出在在线编译器上,或者在我的机器上比较一些测试:

我就是这样实现的。可以找到带有测试的代码 here on github

template<typename Connection, typename CompletionHandler>
    class composed_connection_attempt
    {
    public:
        using connection_type = Connection;
        using endpoint_type = typename Connection::endpoint_type;

        // TODO: clarify the type!
        using completion_handler_t = CompletionHandler;

        constexpr static auto default_timeout()
        {
            return std::chrono::milliseconds(3000);
        }

        constexpr static size_t infinite_attempts()
        {
            return size_t() - 1;
        }

        using executor_type = asio::associated_executor_t<
                typename std::decay<CompletionHandler>::type,
                typename connection_type::executor_type>;

        executor_type get_executor() const noexcept
        {
            // TODO: get completion handler executor
            return pImpl_->get_executor();
        }

        // TODO: allocator type
        using allocator_type = typename asio::associated_allocator_t<CompletionHandler,
                std::allocator<void>>;

        allocator_type get_allocator() const noexcept
        {
            // TODO: get completion handler allocator
            return pImpl_->get_allocator();
        }

        // TODO: constructor to initialize state, pass timeout value?
        template<typename CompletionHandlerT>
        explicit composed_connection_attempt(connection_type &connection,
                                             CompletionHandlerT &&completionHandler)
                : pImpl_(std::make_shared<impl>(connection,
                                                std::forward<CompletionHandlerT>(completionHandler)))
        {}


        template<typename CompletionHandlerT,
                typename Callable>
        explicit composed_connection_attempt(connection_type &connection,
                                             CompletionHandlerT &&completionHandler,
                                             Callable &&stopOnError)
                : pImpl_(std::make_shared<impl>(connection,
                                                std::forward<CompletionHandlerT>(completionHandler),
                                                std::forward<Callable>(stopOnError)))
        {}

        /**
         * Initiation operator. Initiates composed connection procedure.
         * @tparam Endpoint type of endpoint
         * @tparam Duration type of timeout
         * @param endpoint endpoint to be used for connection
         * @param attempts number of attempts
         * @param timeout value to be used as a timeout between attempts
         */
        // TODO: require endpoint type
        template<typename Endpoint, typename Duration>
        void operator()(Endpoint &&endpoint,
                        size_t attempts,
                        Duration &&timeout = default_timeout())
        {
            pImpl_->endpoint_ = std::forward<Endpoint>(endpoint);
            pImpl_->attempts_ = attempts;
            pImpl_->timeout_ = std::forward<Duration>(timeout);

            asyncConnect();
        }

        /**
         * Initiation operator. Initiates composed connection procedure. Connection attempts default to infinite.
         * @tparam Endpoint type of endpoint
         * @tparam Duration type of timeout
         * @param endpoint endpoint to be used for connection
         * @param timeout value to be used as a timeout between attempts
         */
        // TODO: require endpoint type
        template<typename Endpoint, typename Duration>
        void operator()(Endpoint &&endpoint,
                        Duration &&timeout = default_timeout())
        {
            pImpl_->endpoint_ = std::forward<Endpoint>(endpoint);
            pImpl_->timeout_ = std::forward<Duration>(timeout);

            asyncConnect();
        }

        /**
         * Intermediate completion handler. Will be trying to connect until:<br>
         * - has connected<br>
         * - has run out of attempts<br>
         * - user-provided callback #impl::stopOnError_ interrupts execution when a specific connection error has occurred<br>
         * <br>Will be invoked only on connection events:<br>
         * - success<br>
         * - connection timeout or operation_cancelled in case if timer has expired<br>
         * - connection errors<br>
         * @param errorCode error code resulted from async_connect
         */
        void operator()(const asio::error_code &errorCode)
        {
            if (!errorCode)
            {
                stopTimer();
                pImpl_->completionHandler_(errorCode);
                return;
            }

            const auto attemptsLeft = pImpl_->attempts_ == infinite_attempts() ?
                                      infinite_attempts() :
                                      pImpl_->attempts_ - 1;

            if ((pImpl_->stopOnError_ &&
                 pImpl_->stopOnError_(errorCode == asio::error::operation_aborted ?
                                      // special case for operation_aborted on timer expiration - need to send timed_out explicitly
                                      // this should only be resulted from the timer calling cancel()
                                      asio::error::timed_out :
                                      errorCode)) ||
                !attemptsLeft)
            {
                stopTimer();
                pImpl_->completionHandler_(errorCode == asio::error::operation_aborted ?
                                           asio::error::timed_out :
                                           errorCode);
                return;
            }

            pImpl_->attempts_ = attemptsLeft;
            asyncConnect();
        }

    private:

        struct impl
        {
            template<typename CompletionHandlerT>
            impl(connection_type &connection,
                 CompletionHandlerT &&completionHandler)
                    : connection_(connection),
                      completionHandler_(std::forward<CompletionHandlerT>(completionHandler))
            {}

            template<typename CompletionHandlerT, typename Callable>
            impl(connection_type &connection,
                 CompletionHandlerT &&completionHandler,
                 Callable &&stopOnError)
                    : connection_(connection),
                      completionHandler_(std::forward<CompletionHandlerT>(completionHandler)),
                      stopOnError_(std::forward<Callable>(stopOnError))
            {}

            executor_type get_executor() const noexcept
            {
                return asio::get_associated_executor(completionHandler_,
                                                     connection_.get_executor());
            }

            allocator_type get_allocator() const noexcept
            {
                // TODO: get completion handler allocator
                return allocator_type();
            }

            connection_type &connection_;
            completion_handler_t completionHandler_;
            std::function<bool(const asio::error_code &)> stopOnError_;

            // this should be default constructable or should I pass it in the constructor?
            endpoint_type endpoint_;

            // TODO: make timer initialization from get_executor()
            asio::steady_timer timer_{connection_.get_executor()}; // this does not compile! -> {get_executor()};
            asio::steady_timer::duration timeout_ = default_timeout();
            size_t attempts_ = infinite_attempts();
        };

        // TODO: make unique?
        std::shared_ptr<impl> pImpl_;

        // cancels the connection on timeout!
        void startTimer()
        {
            pImpl_->timer_.expires_after(pImpl_->timeout_); // it will automatically cancel a pending timer
            pImpl_->timer_.async_wait(
                    [pImpl = pImpl_](const asio::error_code &errorCode)
                    {
                        // will occur on connection error before timeout
                        if (errorCode == asio::error::operation_aborted)
                            return;

                        // TODO: handle timer errors? What are the possible errors?
                        assert(!errorCode && "unexpected timer error!");

                        // stop attempts
                        pImpl->connection_.cancel();
                    });
        }

        void stopTimer()
        {
            pImpl_->timer_.cancel();
        }

        /**
         * Will be trying to connect until:<br>
         * - has run out of attempts
         * - has been required to stop by stopOnError callback (if it was set)
         * @param endpoint
         * @param attempts
         */
        void asyncConnect()
        {
            startTimer();
            pImpl_->connection_.async_connect(pImpl_->endpoint_, std::move(*this));
        }
    };

    template<typename Connection,
            typename CompletionHandler,
            typename Callable>
    auto make_composed_connection_attempt(Connection &connection,
                                          CompletionHandler &&completionHandler,
                                          Callable &&stopOnError) ->
    composed_connection_attempt<Connection, CompletionHandler>
    {
        return composed_connection_attempt<Connection, CompletionHandler>(connection,
                                                                          std::forward<CompletionHandler>(
                                                                                  completionHandler),
                                                                          std::forward<Callable>(stopOnError));
    }

    template<typename Connection,
            typename Endpoint,
            typename Duration,
            typename CompletionToken,
            typename Callable>
    auto async_connection_attempt(Connection &connection,
                                  Endpoint &&endpoint,
                                  size_t attempts,
                                  Duration &&timeout,
                                  CompletionToken &&completionToken,
                                  Callable &&stopOnError)
    {

        using result_t = asio::async_result<std::decay_t<CompletionToken>,
                void(asio::error_code)>;
        using completion_t = typename result_t::completion_handler_type;

        completion_t completion{std::forward<CompletionToken>(completionToken)};
        result_t result{completion};

        auto composedConnectionAttempt = make_composed_connection_attempt(connection,
                                                                          std::forward<completion_t>(completion),
                                                                          std::forward<Callable>(stopOnError));
        composedConnectionAttempt(std::forward<Endpoint>(endpoint),
                                  attempts,
                                  std::forward<Duration>(timeout));

        return result.get();
    }

终于抽出时间来看这个了:

Wow. I've just created the same without using spawn (using an operation type that uses the State struct arguments as I mentioned). I must say the complexity if this kind of library-implementor-stuff keeps surprising me. I managed to avoid the overhead of shared_from_this though, and of course all demos still pass, so I'm pretty content. If you want I can post as an alternative answer. – sehe

启动函数

启动函数大致相同,只是不再使用spawn(意味着用户不必选择加入 Boost Coroutine 和 Boost Context)。

template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep, F&& stopOn,
                              Token&& token, int attempts = -1,
                              Timer::duration delay = 3s) {
    using Result = asio::async_result<std::decay_t<Token>,
                                      void(error_code, bool)>;
    using Completion = typename Result::completion_handler_type;

    Completion completion(std::forward<Token>(token));
    Result     result(completion);

    using Op = mylib::connection_attempt_op<std::decay_t<F>, Completion>;
    // make an owning self, to be passed along a single async call chain
    auto self = std::make_unique<Op>(object, ep, std::forward<F>(stopOn), completion, attempts, delay);
    (*self)(self);

    return result.get();
}

现在,您会立即发现我使用了唯一所有权容器 (unique_ptr)。我试图通过创建一个值语义操作 class 来避免动态分配,该操作以仅移动的方式封装处理程序。

但是,操作 拥有计时器对象,该对象需要在回调中保持引用稳定。所以,搬家不是办法。当然,我们仍然可以有一个 包含 的可移动值操作类型,只有一个 unique_ptr 用于 _timer,但这是相同的开销并且不太通用.

  • 如果我们将另一个 IO 对象添加到操作状态,我们将需要更动态的分配

  • 移动一个 unique_ptr 比一个大小

    的状态对象要便宜得多
  • 在成员函数中移动 this 指向的对象 非常 容易出错。例如,这将调用未定义的行为:

    bind_executor(_ex, std::bind(std::move(*this), ...))
    

    那是因为 _ex 实际上是 this->_ex 但计算没有顺序,所以 this->_ex 可能在移动之后计算。

    这是我们不应该想要的那种脚枪。

  • 如果我们实现其他异步操作,我们可以使用相同的模式。

操作Class

您将从您的原始代码中识别出这一点。我选择使用我自己的建议来 select operator() 通过调度标记“状态”类型的重载:

struct Init {};
struct Attempt {};
struct Response {};

为了帮助绑定到我们自己,我们还将拥有的 unique_ptr 作为 self 参数传递:

using Self = std::unique_ptr<connection_attempt_op>;
struct Binder {
    Self _self;
    template <typename... Args>
    decltype(auto) operator()(Args&&... args) {
        return (*_self)(_self, std::forward<Args>(args)...);
    }
};

由于 std::bind 的限制,我们无法传递实际的右值参数,但是

  • 如果我们注意调用链是严格顺序的,我们总是可以从 self 恰好移动一次

  • 由于 unique_ptr 的间接性,在移动 self

    后从方法体中使用 this 仍然是安全的
  • 我们现在可以在 _timer.async_wait 的完成处理程序中捕获 this 的稳定值!只要我们保证完成处理程序不会超过 self 的生命周期,我们就不必在这里共享所有权。

    shared_ptr dependence averted!

考虑到这些,我认为完整的实施没有什么惊喜:

namespace mylib { // implementation details

    template <typename F, typename Completion> struct connection_attempt_op {
        tcp::socket&    _object;
        tcp::endpoint   _ep;
        F               _stopOn;
        Completion      _handler;
        int             _attempts;
        Timer::duration _delay;

        using executor_type =
            asio::strand<std::decay_t<decltype(_object.get_executor())>>;
        executor_type          _ex;
        std::unique_ptr<Timer> _timer;

        executor_type const& get_executor() { return _ex; }

        explicit connection_attempt_op(tcp::socket& object, tcp::endpoint ep,
                                       F stopOn, Completion handler,
                                       int attempts, Timer::duration delay)
            : _object(object),
              _ep(ep),
              _stopOn(std::move(stopOn)),
              _handler(std::move(handler)),
              _attempts(attempts),
              _delay(delay),
              _ex(object.get_executor()) {}

        struct Init {};
        struct Attempt {};
        struct Response {};

        using Self = std::unique_ptr<connection_attempt_op>;
        struct Binder {
            Self _self;
            template <typename... Args>
            decltype(auto) operator()(Args&&... args) {
                return (*_self)(_self, std::forward<Args>(args)...);
            }
        };

        void operator()(Self& self, Init = {}) {
            // This is the only invocation perhaps not yet on the strand, so
            // dispatch
            asio::dispatch(_ex, std::bind(Binder{std::move(self)}, Attempt{}));
        }

        void operator()(Self& self, Attempt) {
            if (_attempts--) {
                _timer = std::make_unique<Timer>(_ex, _delay);
                _timer->async_wait([this](error_code ec) {
                    if (!ec) _object.cancel();
                });
                _object.async_connect(
                    _ep,
                    asio::bind_executor(
                        _ex, // _object may not already have been on strand
                        std::bind(Binder{std::move(self)}, Response{},
                                  std::placeholders::_1)));
            } else {
                _handler(mylib::result_code::attempts_exceeded, false);
            }
        }

        void operator()(Self& self, Response, error_code ec) {
            if (!ec) {
                _timer.reset();
                return _handler(result_code::ok, true);
            }

            if (ec == asio::error::operation_aborted) {
                ec = result_code::timeout;
            }

            if (_stopOn && _stopOn(ec))
                return _handler(ec, false);

            _timer.reset();
            _object.close();

            operator()(self, Attempt{});
        }
    };
}

Do note the executor binding; the comment in the Init{} overload as well as with the bind_executor are relevant here.

The strand is essential to maintaining the lifetime guarantees that we needed w.r.t. the async_wait operation. In particular we need the handler ordering follow this

演示时间

代码的其余部分与其他答案 100% 相同,所以让我们在没有进一步评论的情况下展示它:

Live On Wandbox

//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
//#define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>

#ifdef STANDALONE_ASIO
    using std::error_category;
    using std::error_code;
    using std::error_condition;
    using std::system_error;
#else
    namespace asio = boost::asio;
    using boost::system::error_category;
    using boost::system::error_code;
    using boost::system::error_condition;
    using boost::system::system_error;
#endif

using namespace std::chrono_literals;
using asio::ip::tcp;
using Timer = asio::steady_timer;

namespace mylib { // threw in the kitchen sink for error codes
    enum class result_code {
        ok                = 0,
        timeout           = 1,
        attempts_exceeded = 2,
        not_implemented   = 3,
    };

    auto const& get_result_category() {
        struct impl : error_category {
            const char* name() const noexcept override { return "result_code"; }
            std::string message(int ev) const override {
                switch (static_cast<result_code>(ev)) {
                case result_code::ok: return "success";
                case result_code::attempts_exceeded:
                    return "the maximum number of attempts was exceeded";
                case result_code::timeout:
                    return "the operation did not complete in time";
                case result_code::not_implemented:
                    return "feature not implemented";
                default: return "unknown error";
                }
            }
            error_condition
            default_error_condition(int ev) const noexcept override {
                return error_condition{ev, *this};
            }
            bool equivalent(int ev, error_condition const& condition)
                const noexcept override {
                return condition.value() == ev && &condition.category() == this;
            }
            bool equivalent(error_code const& error,
                            int ev) const noexcept override {
                return error.value() == ev && &error.category() == this;
            }
        } const static instance;
        return instance;
    }

    error_code make_error_code(result_code se) {
        return error_code{
            static_cast<std::underlying_type<result_code>::type>(se),
            get_result_category()};
    }

} // namespace mylib

template <>
struct boost::system::is_error_code_enum<mylib::result_code>
    : std::true_type {};

namespace mylib { // implementation details

    template <typename F, typename Completion> struct connection_attempt_op {
        tcp::socket&    _object;
        tcp::endpoint   _ep;
        F               _stopOn;
        Completion      _handler;
        int             _attempts;
        Timer::duration _delay;

        using executor_type =
            asio::strand<std::decay_t<decltype(_object.get_executor())>>;
        executor_type          _ex;
        std::unique_ptr<Timer> _timer;

        executor_type const& get_executor() { return _ex; }

        explicit connection_attempt_op(tcp::socket& object, tcp::endpoint ep,
                                       F stopOn, Completion handler,
                                       int attempts, Timer::duration delay)
            : _object(object),
              _ep(ep),
              _stopOn(std::move(stopOn)),
              _handler(std::move(handler)),
              _attempts(attempts),
              _delay(delay),
              _ex(object.get_executor()) {}

        struct Init {};
        struct Attempt {};
        struct Response {};

        using Self = std::unique_ptr<connection_attempt_op>;
        struct Binder {
            Self _self;
            template <typename... Args>
            decltype(auto) operator()(Args&&... args) {
                return (*_self)(_self, std::forward<Args>(args)...);
            }
        };

        void operator()(Self& self, Init = {}) {
            // This is the only invocation perhaps not yet on the strand, so
            // dispatch
            asio::dispatch(_ex, std::bind(Binder{std::move(self)}, Attempt{}));
        }

        void operator()(Self& self, Attempt) {
            if (_attempts--) {
                _timer = std::make_unique<Timer>(_ex, _delay);
                _timer->async_wait([this](error_code ec) {
                    if (!ec) _object.cancel();
                });
                _object.async_connect(
                    _ep,
                    asio::bind_executor(
                        _ex, // _object may not already have been on strand
                        std::bind(Binder{std::move(self)}, Response{},
                                  std::placeholders::_1)));
            } else {
                _handler(mylib::result_code::attempts_exceeded, false);
            }
        }

        void operator()(Self& self, Response, error_code ec) {
            if (!ec) {
                _timer.reset();
                return _handler(result_code::ok, true);
            }

            if (ec == asio::error::operation_aborted) {
                ec = result_code::timeout;
            }

            if (_stopOn && _stopOn(ec))
                return _handler(ec, false);

            _timer.reset();
            _object.close();

            operator()(self, Attempt{});
        }
    };
}

template <typename F, typename Token>
auto async_connection_attempt(tcp::socket& object, tcp::endpoint ep, F&& stopOn,
                              Token&& token, int attempts = -1,
                              Timer::duration delay = 3s) {
    using Result = asio::async_result<std::decay_t<Token>,
                                      void(error_code, bool)>;
    using Completion = typename Result::completion_handler_type;

    Completion completion(std::forward<Token>(token));
    Result     result(completion);

    using Op = mylib::connection_attempt_op<std::decay_t<F>, Completion>;
    // make an owning self, to be passed along a single async call chain
    auto self = std::make_unique<Op>(object, ep, std::forward<F>(stopOn), completion, attempts, delay);
    (*self)(self);

    return result.get();
}

static auto non_recoverable = [](error_code ec) {
    std::cerr << "Checking " << std::quoted(ec.message()) << "\n";
    // TODO Be specific about intermittent/recoverable conditions
    return false;
};

#include <set>
int main(int argc, char** argv) {
    assert(argc>1);
    static const tcp::endpoint ep{asio::ip::make_address(argv[1]),
                                  8989};

    std::set<std::string_view> const options{argv+1, argv+argc};

    std::cout << std::boolalpha;

    if (options.contains("future")) {
        std::cout
            << "-----------------------\n"
            << " FUTURE DEMO\n"
            << "-----------------------" << std::endl;
        asio::thread_pool ctx;

        try {
            tcp::socket s(ctx);

            std::future<bool> ok = async_connection_attempt(
                    s, ep, non_recoverable, asio::use_future, 5, 800ms);

            std::cout << "Future: " << ok.get() << ", " << s.is_open() << "\n";
        } catch (system_error const& se) {
            std::cout << "Future: " << se.code().message() << "\n";
        }

        ctx.join();
    }

    if (options.contains("coroutine")) {
        std::cout
            << "-----------------------\n"
            << " COROUTINE DEMO\n"
            << "-----------------------" << std::endl;
        asio::io_context ctx;

        asio::spawn(ctx,
            [work = make_work_guard(ctx)](asio::yield_context yc) {
                auto ex = get_associated_executor(yc);
                tcp::socket s(ex);

                error_code ec;
                if (async_connection_attempt(s, ep, non_recoverable,
                                             yc[ec], 5, 800ms)) {
                    std::cout << "Connected in coro\n";
                } else {
                    std::cout << "NOT Connected in coro: " << ec.message() << "\n";
                }
            });

        ctx.run();
    }

    if (options.contains("callback")) {
        std::cout
            << "-----------------------\n"
            << " CALLBACK DEMO\n"
            << "-----------------------" << std::endl;
        asio::io_context ctx;
        tcp::socket s(ctx);

        async_connection_attempt(
            s, ep, non_recoverable,
            [](error_code ec, bool ok) {
                std::cout << "Callback: " << ok << ", "
                          << ec.message() << "\n";
            },
            5, 800ms);

        ctx.run();
    }
}

这是另一个不同场景的本地演示: