我应该如何在 boost::asio 的客户端应用程序中同时使用 async_read_until 和 async_write?

How should I use async_read_until and async_write simultaneously in the client app in boost::asio?

我的动力。

我正在尝试构建一个简单的信使。目前,我已经编写了支持“类似功能的邮件”的客户端和服务器应用程序,也就是说,它们缺少您在每个即时通讯程序中都拥有的聊天交互。

这是我使用的模型。

服务器: 每个连接的客户端的服务器都有一个专用的 Service class 提供实际服务。 Service class 的一个实例有一个 id。

客户端:在特定时刻同时开始从关联的 Service 实例读取消息和向其写入消息。

Tracker:通过在地图中保存用户的登录名和 Service ID 来记录用户的当前会话。还通过保存键值对(聊天参与者 id 1,聊天参与者 id 2)记录打开的聊天。我交替使用用户的登录名和 ID,因为我有一个数据库。

这是一个典型的使用场景。

  1. 用户正在尝试登录。服务器将 ID 为 1 的 Service 实例专用于该用户。然后该用户被标识为 Bob。
  2. Bob 打开与 Ann 的聊天。 Tracker 记录 Bob 使用 Service 1 并且 Bob 打开了与 Ann 的聊天。
  3. 用户正在尝试登录。服务器将 ID 为 2 的 Service 实例专用于该用户。然后该用户被标识为 Ann。
  4. Ann 打开与 Bob 的聊天。 Tracker 记录 Ann 使用 Service 2 并且 Ann 打开了与 Bob 的聊天。
  5. Ann 给 Bob 写了一条消息。 Service 2 收到消息并要求 Service 1 如果 Bob 打开了与 Ann 的聊天,则将消息发送到 Bob 的聊天。为此,我使用 Tracker。在我们的例子中,Bob 正在聊天,因此 Bob 的客户端应用程序应该从 Service 1 读取消息。否则 Service 2 仅将新消息存储在数据库中。

当用户打开与某人的聊天时,客户端应用程序同时开始向关联 Service 实例读取和写入消息。

问题

  1. Bob 打开与 Ann 的聊天。 Ann 打开与 Bob 的聊天。
  2. 安发消息。它们显示在 Bobs 聊天中。
  3. 鲍勃发送了一条消息。它不会显示在 Ann 的聊天中。此外,Ann 的更多消息不再显示在 Bob 的聊天中。

这是我的服务器代码的一部分。我添加了一些上下文,但您可能想查看 Service::onMessageReceivedService::receive_messageService::send_to_chat

/// Struct to track active sessions of clients
struct Tracker {
  static std::mutex current_sessions_guard; ///< mutex to lock the map of current sessions between threads
  static std::map<long, long> current_sessions; 
  static std::map<long, int> client_to_service_id;
};

Class 在客户端服务模型中提供实际服务

class Service {
public:
  void send_to_chat(const std::string& new_message) {
    asio::async_write(*m_sock.get(), asio::buffer(new_message),
      [this]() {
        onAnotherPartyMessageSent(); 
      });
  } 

private:
  void onReceivedReady();
  void receive_message() {
    /// Server loop for reading messages from the client
    spdlog::info("[{}] in receive_message", service_id);

    asio::async_read_until(*m_sock.get(), *(m_message.get()), '\n',
      [this]() {
        onMessageReceived();
      });
  } 
  void onMessageReceived();

private:
  std::shared_ptr<asio::ip::tcp::socket> m_sock; ///< Pointer to an active socket that is used to communicate
                                                 ///< with the client
  int service_id;  
  long dialog_id = -1, client_id = -1, another_party_id = -1;
  std::shared_ptr<asio::streambuf> m_message;
};

方法定义


void Service::onMessageReceived() {
  /// Updates the database with the new message and asks Service instance of another participant
  /// to send the message if they opened this chat.

  std::istream istrm(m_message.get());
  std::string new_message;
  std::getline(istrm, new_message);
  m_message.reset(new asio::streambuf);

  std::unique_lock<std::mutex> tracker_lock(Tracker::current_sessions_guard);

  if (Tracker::current_sessions.find(another_party_id) != Tracker::current_sessions.end()) {
    if (Tracker::current_sessions[another_party_id] == client_id) {
      int another_party_service_id = Tracker::client_to_service_id[another_party_id];
      std::string formatted_msg = _form_message_str(login, new_message);
      
      spdlog::info("[{}] sends to chat '{}'", another_party_service_id, new_message);

      Server::launched_services[another_party_service_id]->send_to_chat(formatted_msg);
    }
  }
  tracker_lock.unlock();
  receive_message();
} 

这是我的客户端代码的一部分。我添加了一些上下文,但您可能想查看 AsyncTCPClient::onSentReadyAsyncTCPClient::message_send_loopAsyncTCPClient::message_wait_loop.

/// Struct that stores a session with the given server
struct Session {
  asio::ip::tcp::socket m_sock; //!< The socket for the client application to connect to the server
  asio::ip::tcp::endpoint m_ep; //!< The server's endpoint
  std::string current_chat;

  std::shared_ptr<asio::streambuf> m_chat_buf;
  std::shared_ptr<asio::streambuf> m_received_message;
};

/// Class that implements an asynchronous TCP client to interact with Service class
class AsyncTCPClient: public asio::noncopyable {

  void onSentReady(std::shared_ptr<Session> session) {
  
    msg_wait_thread.reset(new std::thread([this, session] {
      asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n", 
        [this, session] () {
          message_wait_loop(session);
        });
      }));
    msg_wait_thread->detach();

    msg_thread.reset(new std::thread([this, session] {
      message_send_loop(session);
      }));

    msg_thread->detach();
  } 


  void message_send_loop(std::shared_ptr<Session> session) {
    /// Starts loop in the current chat enabling the client to keep sending messages to another party
    logger->info("'{}' in message_send_loop", session->login);

    clear_console();
    m_console.write(session->current_chat);
    m_console.write("Write your message: ");

    std::string new_message;

    // We use a do/while loop to prevent empty messages either because of the client input
    // or \n's that were not read before

    do {
      new_message = m_console.read();
    } while (new_message.empty());
    

    std::unique_lock<std::mutex> lock_std_out(std_out_guard);
    session->current_chat.append(_form_message_str(session->login, new_message));
    lock_std_out.unlock();

    asio::async_write(session->m_sock, asio::buffer(new_message + "\n"), 
      [this, session] () {
        message_send_loop(session);
      }); 
  } 

  void message_wait_loop(std::shared_ptr<Session> session) {
    /// Starts loop in the current chat enabling the client to keep reading messages from another party

    logger->info("'{}' in message_wait_loop", session->login);

    std::istream istrm(session->m_received_message.get());
    std::string received_message;
    std::getline(istrm, received_message);

    session->m_received_message.reset(new asio::streambuf);

    std::unique_lock<std::mutex> lock_std_out(std_out_wait_guard);
    session->current_chat.append(received_message + "\n");
    lock_std_out.unlock();

    clear_console();
    m_console.write(session->current_chat);
    m_console.write("Write your message: ");
    
    asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n", 
      [this, session] (std::size_t) {
        message_wait_loop(session);
      });
  }

private:
  asio::io_context m_ios;
};

因此,当我描述问题时,我没有 "'{}' in message_wait_loop" 点 3) 的两个客户端的日志。但是我在第 2) 点为 Bob 的客户提供了这些日志。

我也使用 的控制台。它通过互斥体删除回声并控制标准 input/output 资源。但是它并没有解决我的问题。

如有任何帮助,我们将不胜感激。

代码太多太少。这个问题太多了,而实际提出改进建议太少了。我看到过度使用 shared_ptr,线程,特别是 运行 在他们自己的线程上进行异步操作非常奇怪。更别提了:

msg_wait_thread.reset(new std::thread([this, session] {
      asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n", 
        [this, session] () {
          message_wait_loop(session);
        });
      }));
    msg_wait_thread->detach();

最好用完全等价的(但更安全)替代整个东西

  asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n", 
    [this, session] () {
      message_wait_loop(session);
    });

我想读取循环在一个线程上,这样输入就不会阻塞。但是,如果您将主线程视为“UI 线程”(它是),并接受控制台 IO 阻塞在那里,而不是将结果请求发布到所有非线程的单个 IO 线程,它会变得容易得多阻止操作。

如果您将 link 分享到存储库或其他内容,我很乐意查看更多。

更新

在评论中,我查看了 github 存储库中的代码并发布了 PR:https://github.com/cepessh/mymsg/pull/1

This is a very raw proof-of-concept. I have included many changes that aren't actually related to the suggested concurrency fix, but they happened:

  • to allow me to run
  • during review (you will probably want to look at a number of those changes and keep them anyways)
  • fixes that were apparently missing from main branch (e.g. the default value for Message.read_by_recipient database column)

You should be able to work out what changes were made and why by the commit messages.

Only the last two commits actually focus on the idea discussed in chat.