服务器在被中断发送大量数据时崩溃

Server crashing while being interrupted sending large chunk of data

当我正常关闭与其连接的客户端时,我的服务器崩溃了,而客户端正在接收大量数据。我正在考虑一个可能的终身错误,就像 boost ASIO 中的大多数错误一样,但是我自己无法指出我的错误。

每个客户端与服务器建立 2 个连接,其中一个用于同步,另一个是长期连接以接收持续更新。在“同步阶段”,客户端接收大量数据以与服务器状态同步(“状态”基本上是 JSON 格式的数据库数据)。同步后,同步连接关闭。客户端通过其他连接接收对数据库的更新(与“同步数据”相比,这些数据当然是非常小的数据)。

这些是相关文件:

connection.h

#pragma once

#include <array>
#include <memory>
#include <string>
#include <boost/asio.hpp>

class ConnectionManager;

/// Represents a single connection from a client.
class Connection : public std::enable_shared_from_this<Connection>
{
public:
  Connection(const Connection&) = delete;
  Connection& operator=(const Connection&) = delete;

  /// Construct a connection with the given socket.
  explicit Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager);

  /// Start the first asynchronous operation for the connection.
  void start();

  /// Stop all asynchronous operations associated with the connection.
  void stop();

  /// Perform an asynchronous write operation.
  void do_write(const std::string& buffer);

  int getNativeHandle();

  ~Connection();

private:
  /// Perform an asynchronous read operation.
  void do_read();

  /// Socket for the connection.
  boost::asio::ip::tcp::socket socket_;

  /// The manager for this connection.
  ConnectionManager& connection_manager_;

  /// Buffer for incoming data.
  std::array<char, 8192> buffer_;

  std::string outgoing_buffer_;
};

typedef std::shared_ptr<Connection> connection_ptr;

connection.cpp

#include "connection.h"

#include <utility>
#include <vector>
#include <iostream>
#include <thread>

#include "connection_manager.h"

Connection::Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager)
    : socket_(std::move(socket))
    , connection_manager_(manager)
{
}

void Connection::start()
{
  do_read();
}

void Connection::stop()
{
  socket_.close();
}

Connection::~Connection()
{
}

void Connection::do_read()
{
  auto self(shared_from_this());
  socket_.async_read_some(boost::asio::buffer(buffer_), [this, self](boost::system::error_code ec, std::size_t bytes_transferred) {
        if (!ec) {
            std::string buff_str = std::string(buffer_.data(), bytes_transferred);
            const auto& tokenized_buffer = split(buff_str, ' ');
            
            if(!tokenized_buffer.empty() && tokenized_buffer[0] == "sync") {
                /// "syncing connection" sends a specific text
                /// hence I can separate between sycing and long-lived connections here and act accordingly.

                const auto& exec_json_strs = getExecutionJsons();
                const auto& order_json_strs = getOrdersAsJsons();
                const auto& position_json_strs = getPositionsAsJsons();
                const auto& all_json_strs = exec_json_strs + order_json_strs + position_json_strs + createSyncDoneJson();
                
                /// this is potentially a very large data.
                do_write(all_json_strs);
            }

            do_read();
        } else {
          connection_manager_.stop(shared_from_this());
        }
      });
}

void Connection::do_write(const std::string& write_buffer)
{
  outgoing_buffer_ = write_buffer;

  auto self(shared_from_this());
  boost::asio::async_write(socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()), [this, self](boost::system::error_code ec, std::size_t transfer_size) {
        if (!ec) {
           /// everything is fine.
        } else {
           /// what to do here?
           /// server crashes once I get error code 32 (EPIPE) here.
        }
      });
}

connection_manager.h

#pragma once

#include <set>
#include "connection.h"

/// Manages open connections so that they may be cleanly stopped when the server
/// needs to shut down.
class ConnectionManager
{
public:
  ConnectionManager(const ConnectionManager&) = delete;
  ConnectionManager& operator=(const ConnectionManager&) = delete;

  /// Construct a connection manager.
  ConnectionManager();

  /// Add the specified connection to the manager and start it.
  void start(connection_ptr c);

  /// Stop the specified connection.
  void stop(connection_ptr c);

  /// Stop all connections.
  void stop_all();

  void sendAllConnections(const std::string& buffer);

private:
  /// The managed connections.
  std::set<connection_ptr> connections_;
};

connection_manager.cpp

#include "connection_manager.h"

ConnectionManager::ConnectionManager()
{
}

void ConnectionManager::start(connection_ptr c)
{
  connections_.insert(c);
  c->start();
}

void ConnectionManager::stop(connection_ptr c)
{
    connections_.erase(c);
    c->stop();
}

void ConnectionManager::stop_all()
{
  for (auto c: connections_)
    c->stop();

  connections_.clear();
}

/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void ConnectionManager::sendAllConnections(const std::string& buffer)
{
  for (auto c: connections_)
      c->do_write(buffer);
}

server.h

#pragma once

#include <boost/asio.hpp>
#include <string>
#include "connection.h"
#include "connection_manager.h"

class Server
{
public:
  Server(const Server&) = delete;
  Server& operator=(const Server&) = delete;

  /// Construct the server to listen on the specified TCP address and port, and
  /// serve up files from the given directory.
  explicit Server(const std::string& address, const std::string& port);

  /// Run the server's io_service loop.
  void run();

  void deliver(const std::string& buffer);

private:
  /// Perform an asynchronous accept operation.
  void do_accept();

  /// Wait for a request to stop the server.
  void do_await_stop();

  /// The io_service used to perform asynchronous operations.
  boost::asio::io_service io_service_;

  /// The signal_set is used to register for process termination notifications.
  boost::asio::signal_set signals_;

  /// Acceptor used to listen for incoming connections.
  boost::asio::ip::tcp::acceptor acceptor_;

  /// The connection manager which owns all live connections.
  ConnectionManager connection_manager_;

  /// The *NEXT* socket to be accepted.
  boost::asio::ip::tcp::socket socket_;
};

server.cpp

#include "server.h"
#include <signal.h>
#include <utility>

Server::Server(const std::string& address, const std::string& port)
    : io_service_()
    , signals_(io_service_)
    , acceptor_(io_service_)
    , connection_manager_()
    , socket_(io_service_)
{
  // Register to handle the signals that indicate when the server should exit.
  // It is safe to register for the same signal multiple times in a program,
  // provided all registration for the specified signal is made through Asio.
  signals_.add(SIGINT);
  signals_.add(SIGTERM);
#if defined(SIGQUIT)
  signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)

  do_await_stop();

  // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
  boost::asio::ip::tcp::resolver resolver(io_service_);
  boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve({address, port});
  acceptor_.open(endpoint.protocol());
  acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
  acceptor_.bind(endpoint);
  acceptor_.listen();

  do_accept();
}

void Server::run()
{
  // The io_service::run() call will block until all asynchronous operations
  // have finished. While the server is running, there is always at least one
  // asynchronous operation outstanding: the asynchronous accept call waiting
  // for new incoming connections.
  io_service_.run();
}

void Server::do_accept()
{
  acceptor_.async_accept(socket_,
      [this](boost::system::error_code ec)
      {
        // Check whether the server was stopped by a signal before this
        // completion handler had a chance to run.
        if (!acceptor_.is_open())
        {
          return;
        }

        if (!ec)
        {
          connection_manager_.start(std::make_shared<Connection>(
              std::move(socket_), connection_manager_));
        }

        do_accept();
      });
}

void Server::do_await_stop()
{
  signals_.async_wait(
      [this](boost::system::error_code /*ec*/, int /*signo*/)
      {
        // The server is stopped by cancelling all outstanding asynchronous
        // operations. Once all operations have finished the io_service::run()
        // call will exit.
        acceptor_.close();
        connection_manager_.stop_all();
      });
}

/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void Server::deliver(const std::string& buffer)
{
    connection_manager_.sendAllConnections(buffer);
}   

所以,我重复我的问题:当我优雅地关闭与其连接的客户端时,我的服务器崩溃了,而客户端正在接收大量数据,我不知道为什么。

编辑:当我收到 EPIPE 错误时,async_write 函数发生崩溃。该应用程序是多线程的。有 4 个线程在生成数据时使用它们自己的数据调用 Server::deliver。 deliver() 用于使客户端保持最新状态,它与初始同步无关:同步是通过从数据库中获取的持久数据完成的。

我有一个 io_service,所以我认为我不需要股线。 io_service::run 在主线程上调用,因此主线程处于阻塞状态。

正在审核,添加一些缺失的代码位:

namespace /*missing code stubs*/ {
    auto split(std::string_view input, char delim) {
        std::vector<std::string_view> result;
        boost::algorithm::split(result, input,
                                boost::algorithm::is_from_range(delim, delim));
        return result;
    }

    std::string getExecutionJsons()   { return ""; }
    std::string getOrdersAsJsons()    { return ""; }
    std::string getPositionsAsJsons() { return ""; }
    std::string createSyncDoneJson()  { return ""; }
}

现在我注意到的是:

  1. 你只有一个 io_service,所以只有一个线程。好的,除非您的其他代码中有线程(main,例如?),否则不需要线程。

  2. 怀疑线程在起作用的一个特殊原因是没有人可能调用 Server::deliver 因为 run() 正在阻塞。这意味着无论何时调用 deliver() 现在它都会导致数据竞争,从而导致 Undefined Behaviour

    随口评论

     /// this function is used to keep clients up to date with the changes,
     /// not used during syncing phase.
    

    对消除这种担忧没有太大作用。代码需要防止滥用。评论不会被执行。做得更好:

     void Server::deliver(const std::string& buffer) {
         post(io_context_,
              [this, buffer] { connection_manager_.broadcast(std::move(buffer)); });
     }
    
  3. 您在接受“新”写入之前没有检查之前的写入是否已完成。这意味着调用 Connection::do_write 会导致 Undefined Behaviour,原因有两个:

    • 在使用该缓冲区的正在进行的异步操作期间修改 outgoing_buffer_ 是 UB

    • 在同一个 IO 对象上有两个重叠的 async_write 是 UB(参见 docs

    解决这个问题的典型方法是使用一个传出消息队列。

  4. 使用 async_read_some 很少是您想要的,特别是因为读取不会累积到动态缓冲区中。这意味着如果您的数据包在意想不到的边界处分离,您可能根本无法检测到命令,或者检测不到命令。

    而是考虑 asio::async_read_until 使用动态缓冲区(例如

    • 直接读入 std::string,因此您不必将缓冲区复制到字符串中
    • 读入 streambuf 以便您可以使用 std::istream(&sbuf_) 来解析而不是标记化
  5. 连接 all_json_strs 显然 拥有文本容器是一种浪费。相反,使用 const-buffer-sequence 将它们全部组合起来而不进行复制。

    更好的是,考虑一种 JSON 序列化的流式方法,这样在任何给定时间并不是所有 JSON 都需要在内存中序列化。

  6. 不要声明空析构函数 (~Connection)。他们是悲观主义者

  7. 同样适用于空构造函数 (ConnectionManager)。如果必须,请考虑

    ConnectionManager::ConnectionManager() = default;
    
  8. getNativeHandle 给我更多关于其他可能干扰的代码的问题。例如。它可能表明其他库正在执行操作,这又会导致重叠 reads/writes,或者它可能是线程上有更多代码的迹象(因为 Server::run() 根据定义是阻塞的)

  9. 连接管理器可能会保持 weak_ptr,因此 Connections 最终可能会终止。现在,最后一个引用 通过定义 保存在连接管理器中,这意味着当对等方断开连接或会话因其他原因失败时,不会破坏任何内容。

  10. 这不是惯用语:

    // Check whether the server was stopped by a signal before this
    // completion handler had a chance to run.
    if (!acceptor_.is_open()) {
        return;
    }
    

    如果您关闭了接受器,完成处理程序无论如何都会被调用 error::operation_aborted。简单地处理它,例如在最终版本中,我稍后 post:

    // separate strand for each connection - just in case you ever add threads
    acceptor_.async_accept(
        make_strand(io_context_), [this](error_code ec, tcp::socket sock) {
            if (!ec) {
                connection_manager_.register_and_start(
                    std::make_shared<Connection>(std::move(sock),
                                                 connection_manager_));
                do_accept();
            }
        });
    
  11. 我注意到这条评论:

    // The server is stopped by cancelling all outstanding asynchronous
    // operations. Once all operations have finished the io_service::run()
    // call will exit.
    

    事实上,您永远不会 cancel() 在您的代码中对任何 IO 对象进行任何操作。同样,评论不会被执行。最好确实按照你说的去做,让析构函数关闭资源。这可以防止对象为 used-after-close 时的虚假错误,也可以防止非常烦人的竞争条件,例如您关闭了句柄,其他线程 re-opened 同一文件描述符上的新流,并且您已将句柄提供给第三方(使用 getNativeHandle)...您知道这会导致什么吗?

重现问题?

通过这种方式进行审查后,我试图重现该问题,因此我创建了虚假数据:

    std::string getExecutionJsons()   { return std::string(1024,  'E'); }
    std::string getOrdersAsJsons()    { return std::string(13312, 'O'); }
    std::string getPositionsAsJsons() { return std::string(8192,  'P'); }
    std::string createSyncDoneJson()  { return std::string(24576, 'C'); }

对连接进行一些小调整 class:

    std::string buff_str =
        std::string(buffer_.data(), bytes_transferred);
    const auto& tokenized_buffer = split(buff_str, ' ');

    if (!tokenized_buffer.empty() &&
        tokenized_buffer[0] == "sync") {
        std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;
        /// "syncing connection" sends a specific text
        /// hence I can separate between sycing and long-lived
        /// connections here and act accordingly.

        const auto& exec_json_strs     = getExecutionJsons();
        const auto& order_json_strs    = getOrdersAsJsons();
        const auto& position_json_strs = getPositionsAsJsons();
        const auto& all_json_strs      = exec_json_strs +
            order_json_strs + position_json_strs +
            createSyncDoneJson();

        std::cerr << "All json length: " << all_json_strs.length() << std::endl;
        /// this is potentially a very large data.
        do_write(all_json_strs); // already on strand!
    }

我们得到服务器输出

sync detected on 127.0.0.1:43012
All json length: 47104
sync detected on 127.0.0.1:43044
All json length: 47104

并且使用 netcat 伪造的客户端:

$ netcat localhost 8989 <<< 'sync me' > expected
^C
$ wc -c expected 
47104 expected

很好。现在让我们过早断开连接:

netcat localhost 8989 -w0 <<< 'sync me' > truncated
$ wc -c truncated 
0 truncated

所以,它确实导致提前关闭,但服务器仍然说

sync detected on 127.0.0.1:44176
All json length: 47104

让我们也检测 do_write

    async_write( //
        socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()),
        [/*this,*/ self](error_code ec, size_t transfer_size) {
            std::cerr << "do_write completion: " << transfer_size << " bytes ("
                      << ec.message() << ")" << std::endl;

            if (!ec) {
                /// everything is fine.
            } else {
                /// what to do here?
                // FIXME: probably cancel the read loop so the connection
                // closes?
            }
        });

现在我们看到:

sync detected on 127.0.0.1:44494
All json length: 47104
do_write completion: 47104 bytes (Success)
sync detected on 127.0.0.1:44512
All json length: 47104
do_write completion: 32768 bytes (Operation canceled)

对于一个断开连接和一个“正常”连接。

没有 crashes/undefined 行为的迹象。让我们用 -fsanitize=address,undefined 来检查:干净的记录,甚至添加心跳:

int main() {
    Server s("127.0.0.1", "8989");

    std::thread yolo([&s] {
        using namespace std::literals;
        int i = 1;

        do {
            std::this_thread::sleep_for(5s);
        } while (s.deliver("HEARTBEAT DEMO " + std::to_string(i++)));
    });

    s.run();

    yolo.join();
}

结论

上面突出显示但未解决的唯一问题是:

  • 未显示其他线程问题(可能通过 getNativeHandle

  • 您可以在连接中进行重叠写入这一事实 do_write。修正:

     void Connection::write(std::string msg) { // public, might not be on the strand
         post(socket_.get_executor(),
              [self = shared_from_this(), msg = std::move(msg)]() mutable {
                  self->do_write(std::move(msg));
              });
     }
    
     void Connection::do_write(std::string msg) { // assumed on the strand
         outgoing_.push_back(std::move(msg));
    
         if (outgoing_.size() == 1)
             do_write_loop();
     }
    
     void Connection::do_write_loop() {
         if (outgoing_.size() == 0)
             return;
    
         auto self(shared_from_this());
         async_write( //
             socket_, boost::asio::buffer(outgoing_.front()),
             [this, self](error_code ec, size_t transfer_size) {
                 std::cerr << "write completion: " << transfer_size << " bytes ("
                           << ec.message() << ")" << std::endl;
    
                 if (!ec) {
                     outgoing_.pop_front();
                     do_write_loop();
                 } else {
                     socket_.cancel();
    
                     // This would ideally be enough to free the connection, but
                     // since `ConnectionManager` doesn't use `weak_ptr` you need to
                     // force the issue using kind of an "umbillical cord reflux":
                     connection_manager_.stop(self);
                 }
             });
     }
    

如您所见,我还拆分了 write/do_write 以防止调用 off-strand。与 stop.

相同

完整列表

包含以上所有 remarks/fixes 的完整列表:

  • 文件connection.h

     #pragma once
    
     #include <boost/asio.hpp>
    
     #include <array>
     #include <deque>
     #include <memory>
     #include <string>
     using boost::asio::ip::tcp;
    
     class ConnectionManager;
    
     /// Represents a single connection from a client.
     class Connection : public std::enable_shared_from_this<Connection> {
       public:
         Connection(const Connection&) = delete;
         Connection& operator=(const Connection&) = delete;
    
         /// Construct a connection with the given socket.
         explicit Connection(tcp::socket socket, ConnectionManager& manager);
    
         void start();
         void stop();
         void write(std::string msg);
    
       private:
         void do_stop();
         void do_write(std::string msg);
         void do_write_loop();
    
         /// Perform an asynchronous read operation.
         void do_read();
    
         /// Socket for the connection.
         tcp::socket socket_;
    
         /// The manager for this connection.
         ConnectionManager& connection_manager_;
    
         /// Buffer for incoming data.
         std::array<char, 8192> buffer_;
    
         std::deque<std::string> outgoing_;
     };
    
     using connection_ptr = std::shared_ptr<Connection>;
    
  • 文件connection_manager.h

     #pragma once
    
     #include <list>
     #include "connection.h"
    
     /// Manages open connections so that they may be cleanly stopped when the server
     /// needs to shut down.
     class ConnectionManager {
       public:
         ConnectionManager(const ConnectionManager&) = delete;
         ConnectionManager& operator=(const ConnectionManager&) = delete;
         ConnectionManager() = default; // could be split across h/cpp if you wanted
    
         void register_and_start(connection_ptr c);
         void stop(connection_ptr c);
         void stop_all();
    
         void broadcast(const std::string& buffer);
    
         // purge defunct connections, returns remaining active connections
         size_t garbage_collect();
    
       private:
         using handle = std::weak_ptr<connection_ptr::element_type>;
         std::list<handle> connections_;
     };
    
  • 文件server.h

     #pragma once
    
     #include <boost/asio.hpp>
     #include <string>
     #include "connection.h"
     #include "connection_manager.h"
    
     class Server {
       public:
         Server(const Server&) = delete;
         Server& operator=(const Server&) = delete;
    
         /// Construct the server to listen on the specified TCP address and port,
         /// and serve up files from the given directory.
         explicit Server(const std::string& address, const std::string& port);
    
         /// Run the server's io_service loop.
         void run();
    
         bool deliver(const std::string& buffer);
    
       private:
         void do_accept();
         void do_await_signal();
    
         boost::asio::io_context      io_context_;
         boost::asio::any_io_executor strand_{io_context_.get_executor()};
         boost::asio::signal_set      signals_{strand_};
         tcp::acceptor                acceptor_{strand_};
         ConnectionManager            connection_manager_;
     };
    
  • 文件connection.cpp

     #include "connection.h"
    
     #include <boost/algorithm/string.hpp>
     #include <iostream>
     #include <thread>
     #include <utility>
     #include <vector>
    
     #include "connection_manager.h"
     using boost::system::error_code;
    
     Connection::Connection(tcp::socket socket, ConnectionManager& manager)
         : socket_(std::move(socket))
         , connection_manager_(manager) {}
    
     void Connection::start() { // always assumed on the strand (since connection
                                // just constructed)
         do_read();
     }
    
     void Connection::stop() { // public, might not be on the strand
         post(socket_.get_executor(),
              [self = shared_from_this()]() mutable {
                  self->do_stop();
              });
     }
    
     void Connection::do_stop() { // assumed on the strand
         socket_.cancel(); // trust shared pointer to destruct
     }
    
     namespace /*missing code stubs*/ {
         auto split(std::string_view input, char delim) {
             std::vector<std::string_view> result;
             boost::algorithm::split(result, input,
                                     boost::algorithm::is_from_range(delim, delim));
             return result;
         }
    
         std::string getExecutionJsons()   { return std::string(1024,  'E'); }
         std::string getOrdersAsJsons()    { return std::string(13312, 'O'); }
         std::string getPositionsAsJsons() { return std::string(8192,  'P'); }
         std::string createSyncDoneJson()  { return std::string(24576, 'C'); }
     } // namespace
    
     void Connection::do_read() {
         auto self(shared_from_this());
         socket_.async_read_some(
             boost::asio::buffer(buffer_),
             [this, self](error_code ec, size_t bytes_transferred) {
                 if (!ec) {
                     std::string buff_str =
                         std::string(buffer_.data(), bytes_transferred);
                     const auto& tokenized_buffer = split(buff_str, ' ');
    
                     if (!tokenized_buffer.empty() &&
                         tokenized_buffer[0] == "sync") {
                         std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;
                         /// "syncing connection" sends a specific text
                         /// hence I can separate between sycing and long-lived
                         /// connections here and act accordingly.
    
                         const auto& exec_json_strs     = getExecutionJsons();
                         const auto& order_json_strs    = getOrdersAsJsons();
                         const auto& position_json_strs = getPositionsAsJsons();
                         const auto& all_json_strs      = exec_json_strs +
                             order_json_strs + position_json_strs +
                             createSyncDoneJson();
    
                         std::cerr << "All json length: " << all_json_strs.length() << std::endl;
                         /// this is potentially a very large data.
                         do_write(all_json_strs); // already on strand!
                     }
    
                     do_read();
                 } else {
                     std::cerr << "do_read terminating: " << ec.message() << std::endl;
                     connection_manager_.stop(shared_from_this());
                 }
             });
     }
    
     void Connection::write(std::string msg) { // public, might not be on the strand
         post(socket_.get_executor(),
              [self = shared_from_this(), msg = std::move(msg)]() mutable {
                  self->do_write(std::move(msg));
              });
     }
    
     void Connection::do_write(std::string msg) { // assumed on the strand
         outgoing_.push_back(std::move(msg));
    
         if (outgoing_.size() == 1)
             do_write_loop();
     }
    
     void Connection::do_write_loop() {
         if (outgoing_.size() == 0)
             return;
    
         auto self(shared_from_this());
         async_write( //
             socket_, boost::asio::buffer(outgoing_.front()),
             [this, self](error_code ec, size_t transfer_size) {
                 std::cerr << "write completion: " << transfer_size << " bytes ("
                           << ec.message() << ")" << std::endl;
    
                 if (!ec) {
                     outgoing_.pop_front();
                     do_write_loop();
                 } else {
                     socket_.cancel();
    
                     // This would ideally be enough to free the connection, but
                     // since `ConnectionManager` doesn't use `weak_ptr` you need to
                     // force the issue using kind of an "umbellical cord reflux":
                     connection_manager_.stop(self);
                 }
             });
     }
    
  • 文件connection_manager.cpp

     #include "connection_manager.h"
    
     void ConnectionManager::register_and_start(connection_ptr c) {
         connections_.emplace_back(c);
         c->start();
     }
    
     void ConnectionManager::stop(connection_ptr c) {
         c->stop();
     }
    
     void ConnectionManager::stop_all() {
         for (auto h : connections_)
             if (auto c = h.lock())
                 c->stop();
     }
    
     /// this function is used to keep clients up to date with the changes, not used
     /// during syncing phase.
     void ConnectionManager::broadcast(const std::string& buffer) {
         for (auto h : connections_)
             if (auto c = h.lock())
                 c->write(buffer);
     }
    
     size_t ConnectionManager::garbage_collect() {
         connections_.remove_if(std::mem_fn(&handle::expired));
         return connections_.size();
     }
    
  • 文件server.cpp

     #include "server.h"
     #include <signal.h>
     #include <utility>
    
     using boost::system::error_code;
    
     Server::Server(const std::string& address, const std::string& port)
         : io_context_(1) // THREAD HINT: single threaded
         , connection_manager_()
     {
         // Register to handle the signals that indicate when the server should exit.
         // It is safe to register for the same signal multiple times in a program,
         // provided all registration for the specified signal is made through Asio.
         signals_.add(SIGINT);
         signals_.add(SIGTERM);
     #if defined(SIGQUIT)
         signals_.add(SIGQUIT);
     #endif // defined(SIGQUIT)
    
         do_await_signal();
    
         // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
         tcp::resolver resolver(io_context_);
         tcp::endpoint endpoint = *resolver.resolve({address, port});
         acceptor_.open(endpoint.protocol());
         acceptor_.set_option(tcp::acceptor::reuse_address(true));
         acceptor_.bind(endpoint);
         acceptor_.listen();
    
         do_accept();
     }
    
     void Server::run() {
         // The io_service::run() call will block until all asynchronous operations
         // have finished. While the server is running, there is always at least one
         // asynchronous operation outstanding: the asynchronous accept call waiting
         // for new incoming connections.
         io_context_.run();
     }
    
     void Server::do_accept() {
         // separate strand for each connection - just in case you ever add threads
         acceptor_.async_accept(
             make_strand(io_context_), [this](error_code ec, tcp::socket sock) {
                 if (!ec) {
                     connection_manager_.register_and_start(
                         std::make_shared<Connection>(std::move(sock),
                                                      connection_manager_));
                     do_accept();
                 }
             });
     }
    
     void Server::do_await_signal() {
         signals_.async_wait([this](error_code /*ec*/, int /*signo*/) {
             // handler on the strand_ because of the executor on signals_
    
             // The server is stopped by cancelling all outstanding asynchronous
             // operations. Once all operations have finished the io_service::run()
             // call will exit.
             acceptor_.cancel();
             connection_manager_.stop_all();
         });
     }
    
     bool Server::deliver(const std::string& buffer) {
         if (io_context_.stopped()) {
             return false;
         }
         post(io_context_,
              [this, buffer] { connection_manager_.broadcast(std::move(buffer)); });
         return true;
     }
    
  • 文件test.cpp

     #include "server.h"
    
     int main() {
         Server s("127.0.0.1", "8989");
    
         std::thread yolo([&s] {
             using namespace std::literals;
             int i = 1;
    
             do {
                 std::this_thread::sleep_for(5s);
             } while (s.deliver("HEARTBEAT DEMO " + std::to_string(i++)));
         });
    
         s.run();
    
         yolo.join();
     }