如何使用 boost 协程 ts co_await 来改变变量?

How to co_await for a change in a variable using boost coroutine ts?

上下文
我使用 boost 协程 ts、boost asio 和 boost beast 构建了一个网络服务器。 有一个用于读取的协程和一个用于写入的协程。 有一个 message_to_send 队列,消息被推送到该队列以发送给用户。 写入协程检查 message_to_send 队列中是否有东西并发送它。 发送写入协程后,它会自行暂停 100 毫秒,然后再次检查是否有要写入的内容。

问题
写入协程每 100 毫秒轮询一次消息队列。我喜欢在某些计时器触发后找到无需轮询的解决方案。

可能的解决方案
也许有解决 co_await 变量变化的方法。也许用“async_initiate”创建一个 async_wait_for_callback?

代码示例
您可以克隆 project。或者使用此处发布的完整示例代码:

#include <algorithm>
#include <boost/asio.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/system_timer.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/bind/bind.hpp>
#include <boost/optional.hpp>
#include <chrono>
#include <cstddef>
#include <deque>
#include <exception>
#include <iostream>
#include <list>
#include <memory>
#include <set>
#include <stdexcept>
#include <string>
// TODO use cmake to find out if the compiler is gcc or clang
#include <coroutine> // enable if build with gcc
// #include <experimental/coroutine> //enable if build with clang

using namespace boost::beast;
using namespace boost::asio;
typedef boost::asio::use_awaitable_t<>::as_default_on_t<boost::asio::basic_waitable_timer<boost::asio::chrono::system_clock>> CoroTimer;
typedef boost::beast::websocket::stream<boost::beast::tcp_stream> Websocket;
using namespace boost::beast;
using namespace boost::asio;
using boost::asio::ip::tcp;
using tcp_acceptor = use_awaitable_t<>::as_default_on_t<tcp::acceptor>;
struct User
{
  boost::asio::awaitable<void> writeToClient (std::weak_ptr<Websocket> &connection);
  std::deque<std::string> msgQueue{};
  std::shared_ptr<CoroTimer> timer{};
};

void
handleMessage (std::string const &msg, std::list<std::shared_ptr<User>> &users, std::shared_ptr<User> user)
{
  std::cout << "please implement handle message" << std::endl;
  user->msgQueue.push_back ("please implement handle message");
  user->timer->cancel ();
}

boost::asio::awaitable<void>
User::writeToClient (std::weak_ptr<Websocket> &connection)
{
  try
    {
      while (not connection.expired ())
        {
          timer = std::make_shared<CoroTimer> (CoroTimer{ co_await this_coro::executor });
          timer->expires_after (std::chrono::system_clock::time_point::max () - std::chrono::system_clock::now ());
          try
            {
              co_await timer->async_wait ();
            }
          catch (boost::system::system_error &e)
            {
              using namespace boost::system::errc;
              if (operation_canceled == e.code ())
                {
                  // swallow cancel
                }
              else
                {
                  std::cout << "error in timer boost::system::errc: " << e.code () << std::endl;
                  abort ();
                }
            }
          while (not msgQueue.empty () && not connection.expired ())
            {
              auto tmpMsg = std::move (msgQueue.front ());
              std::cout << " msg: " << tmpMsg << std::endl;
              msgQueue.pop_front ();
              co_await connection.lock ()->async_write (buffer (tmpMsg), use_awaitable);
            }
        }
    }
  catch (std::exception &e)
    {
      std::cout << "write Exception: " << e.what () << std::endl;
    }
}

class Server
{
public:
  Server (boost::asio::ip::tcp::endpoint const &endpoint);

  boost::asio::awaitable<void> listener ();

private:
  void removeUser (std::list<std::shared_ptr<User>>::iterator user);
  boost::asio::awaitable<std::string> my_read (Websocket &ws_);

  boost::asio::awaitable<void> readFromClient (std::list<std::shared_ptr<User>>::iterator user, Websocket &connection);

  boost::asio::ip::tcp::endpoint _endpoint{};
  std::list<std::shared_ptr<User>> users{};
};

namespace this_coro = boost::asio::this_coro;

Server::Server (boost::asio::ip::tcp::endpoint const &endpoint) : _endpoint{ endpoint } {}

awaitable<std::string>
Server::my_read (Websocket &ws_)
{
  std::cout << "read" << std::endl;
  flat_buffer buffer;
  co_await ws_.async_read (buffer, use_awaitable);
  auto msg = buffers_to_string (buffer.data ());
  std::cout << "number of letters '" << msg.size () << "' msg: '" << msg << "'" << std::endl;
  co_return msg;
}

awaitable<void>
Server::readFromClient (std::list<std::shared_ptr<User>>::iterator user, Websocket &connection)
{
  try
    {
      for (;;)
        {
          auto readResult = co_await my_read (connection);
          handleMessage (readResult, users, *user);
        }
    }
  catch (std::exception &e)
    {
      removeUser (user);
      std::cout << "read Exception: " << e.what () << std::endl;
    }
}

void
Server::removeUser (std::list<std::shared_ptr<User>>::iterator user)
{
  users.erase (user);
}

awaitable<void>
Server::listener ()
{
  auto executor = co_await this_coro::executor;
  tcp_acceptor acceptor (executor, _endpoint);
  for (;;)
    {
      try
        {
          auto socket = co_await acceptor.async_accept ();
          auto connection = std::make_shared<Websocket> (std::move (socket));
          users.emplace_back (std::make_shared<User> ());
          std::list<std::shared_ptr<User>>::iterator user = std::next (users.end (), -1);
          connection->set_option (websocket::stream_base::timeout::suggested (role_type::server));
          connection->set_option (websocket::stream_base::decorator ([] (websocket::response_type &res) { res.set (http::field::server, std::string (BOOST_BEAST_VERSION_STRING) + " websocket-server-async"); }));
          co_await connection->async_accept (use_awaitable);
          co_spawn (
              executor, [connection, this, &user] () mutable { return readFromClient (user, *connection); }, detached);
          co_spawn (
              executor, [connectionWeakPointer = std::weak_ptr<Websocket>{ connection }, &user] () mutable { return user->get ()->writeToClient (connectionWeakPointer); }, detached);
        }
      catch (std::exception &e)
        {
          std::cout << "Server::listener () connect  Exception : " << e.what () << std::endl;
        }
    }
}

auto const DEFAULT_PORT = u_int16_t{ 55555 };

int
main ()
{
  try
    {
      using namespace boost::asio;
      io_context io_context (1);
      signal_set signals (io_context, SIGINT, SIGTERM);
      signals.async_wait ([&] (auto, auto) { io_context.stop (); });
      auto server = Server{ { ip::tcp::v4 (), DEFAULT_PORT } };
      co_spawn (
          io_context, [&server] { return server.listener (); }, detached);
      io_context.run ();
    }
  catch (std::exception &e)
    {
      std::printf ("Exception: %s\n", e.what ());
    }
  return 0;
}

编辑:根据标记为答案的 sehe 的想法更新了代码。

经典的线程解决方案是条件变量。当然,这不是你想要的——我看到你甚至明确地禁用了 ASIO 线程。好

除了提供 Asio 服务来实现此行为之外,一种方法是使用计时器来模拟条件变量。您可以使用“永不”过期的计时器(截止日期为 timepoint::max())并手动将其重置为 timepoint::min()(取消任何 async_wait)或过去的任何时间以表示条件.然后你可以使用 Timer::async_waituse_awaitable 就像你已经知道的那样。

请注意,您仍然需要“手动”发出更改信号。这就是您想要的,因为其他任何东西都需要内核进程跟踪 support/hardware 需要大量权限并且往往非常慢的调试器工具。

You might want to know about associating the use_awaitable as the default completion token for the executor bound to your timer. See e.g. the examples: https://www.boost.org/doc/libs/1_78_0/doc/html/boost_asio/example/cpp17/coroutines_ts/echo_server_with_default.cpp (the HTML docs do NOT link these examples)