Asio 如何编写自定义 AsyncStream?

Asio How to write a custom AsyncStream?

我实际上已经设法编写了一个可用的 AsyncStream。但是,我不确定我是否按照预期的方式进行了操作。

我的主要问题是:get_executor() 函数应该 return 是哪个执行程序?

在实施过程中出现了几个问题。我用 Q<index>: 标记了它们。 (我会在编辑时保持索引稳定。)非常感谢他们的回答。

我尽量shorten/simplify这个例子。它确实可以正确编译和执行。

#include <iostream>
#include <syncstream>
#include <thread>
#include <coroutine>
#include <future>
#include <random>
#include <string>
#include <memory>

#include <boost/asio.hpp>
#include <boost/asio/experimental/as_tuple.hpp>

#include <fmt/format.h>

inline std::osyncstream tout(const std::string & tag = "") {
  auto hash = std::hash<std::thread::id>{}(std::this_thread::get_id());
  auto hashStr = fmt::format("T{:04X} ", hash >> (sizeof(hash) - 2) * 8); // only display 2 bytes
  auto stream = std::osyncstream(std::cout);

  stream << hashStr;
  if (not tag.empty())
    stream << tag << " ";
  return stream;
}

namespace asio = boost::asio;

template <typename Executor>
requires asio::is_executor<Executor>::value // Q1: Is this the correct way to require that Executor actually is an executor?
                                            // I can't replace typename as there is no concept for Executors.
class Service : public std::enable_shared_from_this<Service<Executor>> {
  template<typename CallerExecutor, typename ServiceExecutor>
  // requires asio::is_executor<CallerExecutor>::value && asio::is_executor<ServiceExecutor>::value
  friend class MyAsyncStream;
  /// Data sent to the service
  std::string bufferIn;
  /// Data produced by the service
  std::string bufferOut;
  /// The strand used to avoid concurrent execution if the passed executor is backed by multiple threads.
  asio::strand<Executor> strand;
  /// Used to slow the data consumption and generation
  asio::steady_timer timer;

  /// Used to generate data
  std::mt19937 gen;
  /// 
  constexpr static const char charset[] =
    "0123456789"
    "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    "abcdefghijklmnopqrstuvwxyz";
  template<typename URBG>
  static std::string gen_string(std::size_t length, URBG &&g) {
    std::string result;
    result.resize(length);
    std::sample(std::cbegin(charset),
                std::cend(charset),
                std::begin(result),
                std::intptr_t(length),
                std::forward<URBG>(g));
    return result;
  }

  static const constexpr auto MAX_OPS = 7;

  asio::awaitable<void> main(std::shared_ptr<Service> captured_self) {
    const constexpr auto TAG = "SrvCo";
    auto exe = co_await asio::this_coro::executor;
    auto use_awaitable = asio::bind_executor(exe, asio::use_awaitable);

    for (size_t ops = 0; ops < MAX_OPS; ops++) {
      timer.expires_after(std::chrono::milliseconds(1000));
      co_await timer.async_wait(use_awaitable);

      tout(TAG) << "Ops " << ops << std::endl;

      bufferOut += gen_string(8, gen);
      tout(TAG) << "Produced: " << bufferOut << std::endl;

      auto consumed = std::string_view(bufferIn).substr(0, 4);
      tout(TAG) << "Consumed: " << consumed << std::endl;
      bufferIn.erase(0, consumed.size());
    }
    tout(TAG) << "Done" << std::endl;
  }
  std::once_flag initOnce;

public:

  explicit Service(Executor && exe) : strand{asio::make_strand(exe)}, timer{exe.context()} {}

  void init() {
    std::call_once(initOnce, [this]() {
      asio::co_spawn(strand, main(this->shared_from_this()), asio::detached);
    });
  }
};

/// https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/AsyncReadStream.html
template<typename CallerExecutor, typename ServiceExecutor>
// requires asio::is_executor<CallerExecutor>::value && asio::is_executor<ServiceExecutor>::value // Q2: Q1 is working why isn't this working with two Types?
class MyAsyncStream {
  typedef void async_rw_handler(boost::system::error_code, size_t);
  /// Holds the callers executor.
  /// Q3: Should this field even exist?
  CallerExecutor executor;
  /// Use a weak_ptr to behave like a file descriptor.
  std::weak_ptr<Service<ServiceExecutor>> serviceRef;
public:
  explicit MyAsyncStream(std::shared_ptr<Service<ServiceExecutor>> & service, CallerExecutor & exe) : executor{exe}, serviceRef{service} {}

  /// Needed by the stream specification.
  typedef CallerExecutor executor_type;

  /**
   * Q4: Which executor should this function return? The CallerExecutor or the ServiceExecutor or something different.
   *     In this example it is never called. However it is needed by the stream specification. https://www.boost.org/doc/libs/1_79_0/doc/html/boost_asio/reference/AsyncReadStream.html
   * I really don't want to leak the ServiceExecutor to library users.
   * @return Returns the executor supplied in the constructor.
   */
  auto get_executor() {
    tout() << "GETTING EXE" << std::endl;
    return executor;
  }

  template<typename MutableBufferSequence,
    asio::completion_token_for<async_rw_handler>
    CompletionToken = typename asio::default_completion_token<CallerExecutor>::type>
  requires asio::is_mutable_buffer_sequence<MutableBufferSequence>::value
  auto async_read_some(const MutableBufferSequence &buffer,
                       CompletionToken &&token = typename asio::default_completion_token<CallerExecutor>::type()) {
    return asio::async_initiate<CompletionToken, async_rw_handler>([&](auto completion_handler) { // Q5: Can I avoid this async_initiate somehow?
      BOOST_ASIO_READ_HANDLER_CHECK(CompletionToken, completion_handler) type_check;              // I tried using co_spawn directly without success.
      asio::co_spawn(
          asio::get_associated_executor(completion_handler), // Q6-1: should I use get_executor() here? Currently, I just get the callers executor.
          [&, buffer = std::move(buffer), completion_handler = std::forward<CompletionToken>(completion_handler)]
          () mutable -> asio::awaitable<void> {
        const constexpr auto TAG = "ARS";
        auto callerExe = co_await asio::this_coro::executor;
        auto to_caller = asio::bind_executor(callerExe, asio::use_awaitable);

        auto service = serviceRef.lock();
        if (service == nullptr) {
          std::move(completion_handler)(asio::error::bad_descriptor, 0);
          co_return;
        }
        auto to_service = asio::bind_executor(service->strand, asio::use_awaitable);

        co_await asio::post(to_service);

        tout(TAG) << "performing read" << std::endl;

        auto buf_begin = asio::buffers_begin(buffer);
        auto buf_end = asio::buffers_end(buffer);
        boost::system::error_code err = asio::error::fault;
        size_t it = 0;
        while (!service->bufferOut.empty()) {
          if (buf_begin == buf_end) {
            // error the buffer is smaller than the request read amount
            err = asio::error::no_buffer_space;
            goto completion;
          }

          *buf_begin++ = service->bufferOut.at(0);
          service->bufferOut.erase(0, 1);
          it++;
        }
        err = asio::stream_errc::eof;
        completion:
        co_await asio::post(to_caller); // without this call the function returns on the wrong thread
        tout(TAG) << "read done returned" << std::endl;
        std::move(completion_handler)(err, it);
      }, asio::detached);
    }, token);
  }

  template<typename ConstBufferSequence,
    asio::completion_token_for <async_rw_handler>
    CompletionToken = typename asio::default_completion_token<CallerExecutor>::type>
  requires asio::is_const_buffer_sequence<ConstBufferSequence>::value
  auto async_write_some(const ConstBufferSequence &buffer,
                        CompletionToken &&token = typename asio::default_completion_token<CallerExecutor>::type()) {
    return asio::async_initiate<CompletionToken, async_rw_handler>([&](auto completion_handler) {
      BOOST_ASIO_WRITE_HANDLER_CHECK(CompletionToken, completion_handler) type_check;
      asio::co_spawn(
          asio::get_associated_executor(completion_handler), // Q6-2: should I use get_executor() here? Currently, I just get the callers executor.
          [&, buffer = std::move(buffer), completion_handler = std::forward<CompletionToken>(completion_handler)]
          () mutable -> asio::awaitable<void> {
        const constexpr auto TAG = "AWS";
        auto callerExe = co_await asio::this_coro::executor;
        auto to_caller = asio::bind_executor(callerExe, asio::use_awaitable);

        auto service = serviceRef.lock();
        if (service == nullptr) {
          std::move(completion_handler)(asio::error::bad_descriptor, 0);
          co_return;
        }
        auto to_service = asio::bind_executor(service->strand, asio::use_awaitable);

        co_await asio::post(to_service);

        tout(TAG) << "performing write" << std::endl;

        auto buf_begin = asio::buffers_begin(buffer);
        auto buf_end = asio::buffers_end(buffer);
        boost::system::error_code err = asio::error::fault;
        size_t it = 0;
        while (buf_begin != buf_end) {
          service->bufferIn.push_back(static_cast<char>(*buf_begin++));
          it++;
        }
        err = asio::stream_errc::eof;
        completion:
        co_await asio::post(to_caller); // without this call the function returns on the wrong thread
        tout(TAG) << "write done returned" << std::endl;
        std::move(completion_handler)(err, it);
      }, asio::detached);
    }, token);
  }
};

asio::awaitable<int> mainCo() {
  const constexpr auto TAG = "MainCo";
  auto exe = co_await asio::this_coro::executor;
  auto use_awaitable = asio::bind_executor(exe, asio::use_awaitable);
  auto as_tuple  = asio::experimental::as_tuple(use_awaitable);
  auto use_future = asio::use_future;
  auto timer = asio::steady_timer(exe);

  asio::thread_pool servicePool{1};

  co_await asio::post(asio::bind_executor(servicePool, asio::use_awaitable));
  tout() << "ServiceThread run start" << std::endl;
  co_await asio::post(use_awaitable);

  auto service = std::make_shared<Service<boost::asio::thread_pool::basic_executor_type<std::allocator<void>, 0> >>(servicePool.get_executor());
  service->init();
  auto stream = MyAsyncStream{service, exe};

  for (size_t it = 0; it < 4; it++) {
    {
      std::vector<char> dataBackend;
      auto dynBuffer = asio::dynamic_buffer(dataBackend, 50);
      auto [ec, n] = co_await asio::async_read(stream, dynBuffer, as_tuple); // Q7-1: Can I avoid using as_tuple here?

      tout(TAG) << "read done: " << std::endl
                << "n:   " << n  << std::endl
                << "msg: " << std::string{dataBackend.begin(), dataBackend.end()} << std::endl
                << "ec:  " << ec.message()
                << std::endl;
    }

    {
      auto const constexpr str = std::string_view{"HelloW"};
      std::vector<char> dataBackend{str.begin(), str.end()};

      auto dynBuffer = asio::dynamic_buffer(dataBackend, 50);
      auto [ec, n] = co_await asio::async_write(stream, dynBuffer, as_tuple); // Q7-2: Can I avoid using as_tuple here?

      tout(TAG) << "write done: " << std::endl
                << "n:   " << n   << std::endl
                << "msg: " << str << std::endl
                << "ec:  " << ec.message()
                << std::endl;
    }


    timer.expires_after(std::chrono::milliseconds(2500));
    co_await timer.async_wait(use_awaitable);
  }

  servicePool.join();
  tout(TAG) << "Normal exit" << std::endl;
  co_return 0;
}

int main() {
  asio::io_context appCtx;

  auto fut = asio::co_spawn(asio::make_strand(appCtx), mainCo(), asio::use_future);

  tout() << "MainThread run start" << std::endl;
  appCtx.run();
  tout() << "MainThread run done" << std::endl;

  return fut.get();
}

  • Q1

    我想看起来不错。但是,请参阅 Q2。

  • Q2

    看起来它杀死了 AsyncStream 的 CTAD。如果我不得不猜测这是因为 ServiceExecutor 在 non-deduced 上下文中。手动帮助它可能会有所帮助,但请注意此处的第二个静态断言是如何失败的:

      using ServiceExecutor = asio::thread_pool::executor_type;
      using CallerExecutor = asio::any_io_executor;
      static_assert(asio::is_executor<ServiceExecutor>::value);
      static_assert(asio::is_executor<CallerExecutor>::value);
    

    那是因为co_await this_coro::executorreturnsany_io_executor,是不同“牌子”的executor。您需要使用 execution::is_executor<T>::value 进行检查。事实上,您可能希望像在 Asio 实现函数中那样进行兼容性检查:

        (is_executor<Executor>::value || execution::is_executor<Executor>::value)
          && is_convertible<Executor, AwaitableExecutor>::value
    

PS: It dawned on me that the non-deduced context is a symptom of overly-specific template arguments. Just make AsyncStream<Executor, Service> (why bother with the specific type arguments that are implementation details of Service?). That fixes the CTAD (Live On Compiler Explorer)

template <typename CallerExecutor, typename Service>
requires my_is_executor<CallerExecutor>::value //
class MyAsyncStream {
  • Q3:这个字段应该存在吗?

     CallerExecutor executor;
    

    是的,这就是 IO 对象记住其绑定执行程序的方式。

  • Q4:那是你 return 调用者执行者所在的位置。

    它没有在您的应用程序中调用,但它可能会被调用。如果您针对您的 IO 对象 (MyAsyncStream) 调用任何组合操作(如 asio::async_read_until),它会 - 默认情况下 - 运行 关联执行程序上的任何处理程序。这可能会添加正确性所需的行为(如处理程序序列化、工作跟踪等)。

    像以往一样,处理程序可以绑定到另一个执行程序以覆盖它。

  • Q5 我不这么认为,除非您想强制使用 use_awaitable(或兼容的)完成令牌。事实上,你 运行 内部的 coro 应该是调用者的实现细节。

  • Q6 是的,但不是而是关闭。我假设您需要使用 IO 对象的执行程序作为后备:

     asio::get_associated_executor(
         completion_handler, this->get_executor())
    
  • Q7-1:我可以避免在这里使用as_tuple吗?

     auto [ec, n] = co_await asio::async_read(stream, dynBuffer, as_tuple);
    

    我想如果你能“仅仅”处理 system_error 个异常:

    auto n = co_await asio::async_read(stream, dynBuffer, use_awaitable);
    

    或者,我相信maybe redirect_error is applicable?