如何在组合函数中使用 boost::asio::defer() ?

how to use boost::asio::defer() in composing function?

在 Boost 1.66 上,Asio 具有 deprecated the asio_handler_is_continuation hook function, promoting usage of defer function. It seems that defer 功能,当 asio_handler_is_continuation==true 时,其行为与 post 完全相同。但是,defer的使用方式和asio_handler_is_continuation的使用方式不同,不知道如何正确使用defer

编辑:我认为下面的示例过于冗长,无法清楚地表达我的意思。这是较短的示例:

async_read_until(stream, read_buffer, "\r\n", 
    [](boost::system::error_code ec, std::size_t bytes_transferred)
    {
        if(!ec)
            async_write(stream, write_buffer, some_handler);
    })

现在,当 async_read_until 完成时,传递的 lambda 处理程序将使用某种等同于 boost::asio::post 的方式被调用。但是 lambda 处理程序内部的 async_write 是上一个异步任务的延续,所以我想使用 defer 调用 lambda 处理程序来进行优化。

有什么方法可以使用 defer(而不是 post)来调用上面示例中的 lambda 处理程序吗?

原始 POST:我正在尝试编写一个类似于 beast document 中的简单启动函数 async_echo,除了调用 boost::asio::async_write 的部分将被称为延续。为此,之前的中间操作 boost::asio::async_read_until 必须调用处理程序 *this 作为延续。

这是我在野兽文档的async_echo示例中所指的部分:

template<class AsyncStream, class Handler>
void echo_op<AsyncStream, Handler>::
operator()(boost::beast::error_code ec, std::size_t bytes_transferred)
{
    // Store a reference to our state. The address of the state won't
    // change, and this solves the problem where dereferencing the
    // data member is undefined after a move.
    auto& p = *p_;

    // Now perform the next step in the state machine
    switch(ec ? 2 : p.step)
    {
        // initial entry
        case 0:
            // read up to the first newline
            p.step = 1;
            return boost::asio::async_read_until(p.stream, p.buffer, "\r", std::move(*this));

        case 1:
            // write everything back
            p.step = 2;
            // async_read_until could have read past the newline,
            // use buffers_prefix to make sure we only send one line
            return boost::asio::async_write(p.stream,
                boost::beast::buffers_prefix(bytes_transferred, p.buffer.data()), std::move(*this));

        case 2:
            p.buffer.consume(bytes_transferred);
            break;
    }

    // Invoke the final handler. The implementation of `handler_ptr`
    // will deallocate the storage for the state before the handler
    // is invoked. This is necessary to provide the
    // destroy-before-invocation guarantee on handler memory
    // customizations.
    //
    // If we wanted to pass any arguments to the handler which come
    // from the `state`, they would have to be moved to the stack
    // first or else undefined behavior results.
    //
    p_.invoke(ec);
    return;
}

在 1.66 之前的日子里,我可以简单地 hook 函数如下:

template <Function, Handler>
friend bool asio_handler_is_continuation(echo_op<Function, Handler>* handler)
{
    using boost::asio::asio_handler_is_continuation;
    return handler.p_->step == 1 || 
        asio_handler_is_continuation(std::addressof(handler.p_->handler()));
}

echo_op 的声明中。

从 Boost 1.66 开始,上面的代码不太可能有任何效果(没有 BOOST_ASIO_NO_DEPRECATION 宏)。所以我应该使用 defer

但由于 boost::asio::async_read_untila guarantee 那 "Invocation of the handler will be performed in a manner equivalent to using boost::asio::io_context::post().", *this 将不会使用 defer 调用,即作为延续。

是否有任何解决方法可以使 boost::asio::async_read_until 使用 defer 调用处理程序?有没有利用 defer 函数的好例子?

这也是我过去的困惑。

Executor::deferExecutor::post 都执行相同的操作,除了这个注释:

Note: Although the requirements placed on defer are identical to post, the use of post conveys a preference that the caller does not block the first step of f1's progress, whereas defer conveys a preference that the caller does block the first step of f1. One use of defer is to convey the intention of the caller that f1 is a continuation of the current call context. The executor may use this information to optimize or otherwise adjust the way in which f1 is invoked. —end note

https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio/reference/Executor1.html

因此,链接延续的责任似乎已成为 Executor 模型的实现细节。

据我所知,这意味着您需要做的就是调用自由函数 defer(executor, handler),执行程序将 'do the right thing'

更新:

找到一些说明如何通过最终执行程序链接处理程序的文档:

文档来源:https://github.com/chriskohlhoff/asio-tr2/blob/master/doc/executors.qbk

示例:https://github.com/chriskohlhoff/executors/blob/v0.2-branch/src/examples/executor/async_op_2.cpp

参见 async_op_2.cpp

中的第 38+ 行

玩了一会儿后,发现 asio_handler_is_continuation 没有被弃用;目前无法用 defer 替换它。

为了将任何 post 调用重定向到 defer,我提供了以下自定义执行程序:

template<typename UnderlyingExecutor, typename std::enable_if<boost::asio::is_executor<UnderlyingExecutor>::value, int>::type = 0>
class continuation_executor
{
    private:
        UnderlyingExecutor _ex;

    public:

        continuation_executor(UnderlyingExecutor ex)
            :_ex(ex){}

        template<class Function, class Allocator>
        void post(Function f, Allocator a)
        {
            std::cout<<"Redirected to defer()"<<std::endl;
            _ex.defer(BOOST_ASIO_MOVE_CAST(Function)(f),a);
        }

        template<class Function, class Allocator>
        void defer(Function f, Allocator a)
        {
            std::cout<<"defer() called"<<std::endl;
            _ex.defer(BOOST_ASIO_MOVE_CAST(Function)(f),a);
        }

        template<class Function, class Allocator>
        void dispatch(Function f, Allocator a)
        {
            std::cout<<"dispatch() called"<<std::endl;
            _ex.dispatch(BOOST_ASIO_MOVE_CAST(Function)(f),a);
        }

        auto context() -> decltype(_ex.context())
        {
            return _ex.context(); 
        }

        void on_work_started()
        {
            _ex.on_work_started();
        }
        void on_work_finished()
        {
            _ex.on_work_finished();
        }
};

它真的是一个微不足道的执行者,完全依赖底层执行者,continuation_executor::post重定向到底层执行者的defer

但是当我将一个处理程序传递给 async_read_some 时,使用 bind_executor(conti_exec, handler) 之类的东西,我得到以下输出:

dispatch() called

因此传递的处理程序不会通过 post() 进行调度;它是通过其他方式安排的。具体来说,内置的异步函数如 asio::async_read_some 通过 scheduler::post_immediate_completion 调度内部操作对象,然后 io_context::run 执行操作。

异步操作完成后,调用操作对象的complete方法来执行用户提供的处理程序。该 complete 方法,至少在当前实现中,使用关联执行程序的 dispatch 方法来 运行 处理程序。上面的钩子没有地方。所以它完全过时了;尝试使用 defer 而不是 asio_handler_is_continuation 是不走运的。

我在问题 "Starting from Boost 1.66, the code above is not likely to have any effect (without BOOST_ASIO_NO_DEPRECATION macro)." 中所说的完全错误。 asio_handler_is_continuation仍然有效,并且是not deprecated as of 1.67

这是asio_handler_is_continuation仍然有效的证据:

  // Start an asynchronous send. The data being sent must be valid for the
  // lifetime of the asynchronous operation.
  template <typename ConstBufferSequence, typename Handler>
  void async_send(base_implementation_type& impl,
      const ConstBufferSequence& buffers,
      socket_base::message_flags flags, Handler& handler)
  {
    bool is_continuation =
      boost_asio_handler_cont_helpers::is_continuation(handler);

    // Allocate and construct an operation to wrap the handler.
    typedef reactive_socket_send_op<ConstBufferSequence, Handler> op;
    typename op::ptr p = { boost::asio::detail::addressof(handler),
      op::ptr::allocate(handler), 0 };
    p.p = new (p.v) op(impl.socket_, impl.state_, buffers, flags, handler);

    BOOST_ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
          &impl, impl.socket_, "async_send"));

    start_op(impl, reactor::write_op, p.p, is_continuation, true,
        ((impl.state_ & socket_ops::stream_oriented)
          && buffer_sequence_adapter<boost::asio::const_buffer,
            ConstBufferSequence>::all_empty(buffers)));
    p.v = p.p = 0;
  }

请注意,它使用 boost_asio_handler_cont_helpers 来确定处理程序是否继续。 boost_asio_handler_cont_helpers 内部调用 asio_handler_is_continuation.

async_sendasync_write_some 内部使用。我没有检查 asio 库提供的每个内置异步任务,但我很确定其他异步任务以相同的方式执行它的处理程序。

因此,如果您希望内置异步任务继续执行您的处理程序,您将不得不依赖 asio_handler_is_continuationdefer 不会 完全 取代它! defer 只能在您直接从代码安排处理程序时使用。

看来,在https://github.com/chriskohlhoff/asio-tr2/blob/master/doc/executors.qbk上找到的代码中的注释实际上包含了对dispatch()post()defer()的最详细描述。

defer() 将新创建的作业的处理推迟到当前作业完成之后。它不会 将其推迟到其他排队的作业之后。推迟到当前作业完成后有一个很大的优势,即新作业可以 运行 在与当前作业相同的线程中。默认执行程序将尝试这样做。由于新作业很可能会使用当前作业中的一些或更多数据,因此留在同一个线程中并因此使用相同的 CPU 核心会大大改善缓存局部性,从而减少总执行时间并增加吞吐量。

换句话说:在您的完成处理程序/任务中,恰好启动一个新的完成处理程序/任务,您几乎总是希望使用 defer() 而不是 post()。如果新任务是在当前任务即将结束时启动的,则尤其如此。

但是,那些启动多个新任务的任务应该只通过 defer() 提交最相关的任务(通常是最后一个任务),并使用 post() 提交所有其他的。

只有那些非常简单的任务,才考虑通过 dispatch() 而不是 post()queue() 提交它们:如果规则允许(就像 strand,他们被分派到,当前有一个空队列),然后他们将直接运行,避免所有排队和取消排队延迟。