服务器在被中断发送大量数据时崩溃
Server crashing while being interrupted sending large chunk of data
当我正常关闭与其连接的客户端时,我的服务器崩溃了,而客户端正在接收大量数据。我正在考虑一个可能的终身错误,就像 boost ASIO 中的大多数错误一样,但是我自己无法指出我的错误。
每个客户端与服务器建立 2 个连接,其中一个用于同步,另一个是长期连接以接收持续更新。在“同步阶段”,客户端接收大量数据以与服务器状态同步(“状态”基本上是 JSON 格式的数据库数据)。同步后,同步连接关闭。客户端通过其他连接接收对数据库的更新(与“同步数据”相比,这些数据当然是非常小的数据)。
这些是相关文件:
connection.h
#pragma once
#include <array>
#include <memory>
#include <string>
#include <boost/asio.hpp>
class ConnectionManager;
/// Represents a single connection from a client.
class Connection : public std::enable_shared_from_this<Connection>
{
public:
Connection(const Connection&) = delete;
Connection& operator=(const Connection&) = delete;
/// Construct a connection with the given socket.
explicit Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager);
/// Start the first asynchronous operation for the connection.
void start();
/// Stop all asynchronous operations associated with the connection.
void stop();
/// Perform an asynchronous write operation.
void do_write(const std::string& buffer);
int getNativeHandle();
~Connection();
private:
/// Perform an asynchronous read operation.
void do_read();
/// Socket for the connection.
boost::asio::ip::tcp::socket socket_;
/// The manager for this connection.
ConnectionManager& connection_manager_;
/// Buffer for incoming data.
std::array<char, 8192> buffer_;
std::string outgoing_buffer_;
};
typedef std::shared_ptr<Connection> connection_ptr;
connection.cpp
#include "connection.h"
#include <utility>
#include <vector>
#include <iostream>
#include <thread>
#include "connection_manager.h"
Connection::Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager)
: socket_(std::move(socket))
, connection_manager_(manager)
{
}
void Connection::start()
{
do_read();
}
void Connection::stop()
{
socket_.close();
}
Connection::~Connection()
{
}
void Connection::do_read()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(buffer_), [this, self](boost::system::error_code ec, std::size_t bytes_transferred) {
if (!ec) {
std::string buff_str = std::string(buffer_.data(), bytes_transferred);
const auto& tokenized_buffer = split(buff_str, ' ');
if(!tokenized_buffer.empty() && tokenized_buffer[0] == "sync") {
/// "syncing connection" sends a specific text
/// hence I can separate between sycing and long-lived connections here and act accordingly.
const auto& exec_json_strs = getExecutionJsons();
const auto& order_json_strs = getOrdersAsJsons();
const auto& position_json_strs = getPositionsAsJsons();
const auto& all_json_strs = exec_json_strs + order_json_strs + position_json_strs + createSyncDoneJson();
/// this is potentially a very large data.
do_write(all_json_strs);
}
do_read();
} else {
connection_manager_.stop(shared_from_this());
}
});
}
void Connection::do_write(const std::string& write_buffer)
{
outgoing_buffer_ = write_buffer;
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()), [this, self](boost::system::error_code ec, std::size_t transfer_size) {
if (!ec) {
/// everything is fine.
} else {
/// what to do here?
/// server crashes once I get error code 32 (EPIPE) here.
}
});
}
connection_manager.h
#pragma once
#include <set>
#include "connection.h"
/// Manages open connections so that they may be cleanly stopped when the server
/// needs to shut down.
class ConnectionManager
{
public:
ConnectionManager(const ConnectionManager&) = delete;
ConnectionManager& operator=(const ConnectionManager&) = delete;
/// Construct a connection manager.
ConnectionManager();
/// Add the specified connection to the manager and start it.
void start(connection_ptr c);
/// Stop the specified connection.
void stop(connection_ptr c);
/// Stop all connections.
void stop_all();
void sendAllConnections(const std::string& buffer);
private:
/// The managed connections.
std::set<connection_ptr> connections_;
};
connection_manager.cpp
#include "connection_manager.h"
ConnectionManager::ConnectionManager()
{
}
void ConnectionManager::start(connection_ptr c)
{
connections_.insert(c);
c->start();
}
void ConnectionManager::stop(connection_ptr c)
{
connections_.erase(c);
c->stop();
}
void ConnectionManager::stop_all()
{
for (auto c: connections_)
c->stop();
connections_.clear();
}
/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void ConnectionManager::sendAllConnections(const std::string& buffer)
{
for (auto c: connections_)
c->do_write(buffer);
}
server.h
#pragma once
#include <boost/asio.hpp>
#include <string>
#include "connection.h"
#include "connection_manager.h"
class Server
{
public:
Server(const Server&) = delete;
Server& operator=(const Server&) = delete;
/// Construct the server to listen on the specified TCP address and port, and
/// serve up files from the given directory.
explicit Server(const std::string& address, const std::string& port);
/// Run the server's io_service loop.
void run();
void deliver(const std::string& buffer);
private:
/// Perform an asynchronous accept operation.
void do_accept();
/// Wait for a request to stop the server.
void do_await_stop();
/// The io_service used to perform asynchronous operations.
boost::asio::io_service io_service_;
/// The signal_set is used to register for process termination notifications.
boost::asio::signal_set signals_;
/// Acceptor used to listen for incoming connections.
boost::asio::ip::tcp::acceptor acceptor_;
/// The connection manager which owns all live connections.
ConnectionManager connection_manager_;
/// The *NEXT* socket to be accepted.
boost::asio::ip::tcp::socket socket_;
};
server.cpp
#include "server.h"
#include <signal.h>
#include <utility>
Server::Server(const std::string& address, const std::string& port)
: io_service_()
, signals_(io_service_)
, acceptor_(io_service_)
, connection_manager_()
, socket_(io_service_)
{
// Register to handle the signals that indicate when the server should exit.
// It is safe to register for the same signal multiple times in a program,
// provided all registration for the specified signal is made through Asio.
signals_.add(SIGINT);
signals_.add(SIGTERM);
#if defined(SIGQUIT)
signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)
do_await_stop();
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(io_service_);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve({address, port});
acceptor_.open(endpoint.protocol());
acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
do_accept();
}
void Server::run()
{
// The io_service::run() call will block until all asynchronous operations
// have finished. While the server is running, there is always at least one
// asynchronous operation outstanding: the asynchronous accept call waiting
// for new incoming connections.
io_service_.run();
}
void Server::do_accept()
{
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
{
// Check whether the server was stopped by a signal before this
// completion handler had a chance to run.
if (!acceptor_.is_open())
{
return;
}
if (!ec)
{
connection_manager_.start(std::make_shared<Connection>(
std::move(socket_), connection_manager_));
}
do_accept();
});
}
void Server::do_await_stop()
{
signals_.async_wait(
[this](boost::system::error_code /*ec*/, int /*signo*/)
{
// The server is stopped by cancelling all outstanding asynchronous
// operations. Once all operations have finished the io_service::run()
// call will exit.
acceptor_.close();
connection_manager_.stop_all();
});
}
/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void Server::deliver(const std::string& buffer)
{
connection_manager_.sendAllConnections(buffer);
}
所以,我重复我的问题:当我优雅地关闭与其连接的客户端时,我的服务器崩溃了,而客户端正在接收大量数据,我不知道为什么。
编辑:当我收到 EPIPE 错误时,async_write 函数发生崩溃。该应用程序是多线程的。有 4 个线程在生成数据时使用它们自己的数据调用 Server::deliver。 deliver() 用于使客户端保持最新状态,它与初始同步无关:同步是通过从数据库中获取的持久数据完成的。
我有一个 io_service,所以我认为我不需要股线。 io_service::run 在主线程上调用,因此主线程处于阻塞状态。
正在审核,添加一些缺失的代码位:
namespace /*missing code stubs*/ {
auto split(std::string_view input, char delim) {
std::vector<std::string_view> result;
boost::algorithm::split(result, input,
boost::algorithm::is_from_range(delim, delim));
return result;
}
std::string getExecutionJsons() { return ""; }
std::string getOrdersAsJsons() { return ""; }
std::string getPositionsAsJsons() { return ""; }
std::string createSyncDoneJson() { return ""; }
}
现在我注意到的是:
你只有一个 io_service
,所以只有一个线程。好的,除非您的其他代码中有线程(main
,例如?),否则不需要线程。
怀疑线程在起作用的一个特殊原因是没有人可能调用 Server::deliver
因为 run()
正在阻塞。这意味着无论何时调用 deliver()
现在它都会导致数据竞争,从而导致 Undefined Behaviour
随口评论
/// this function is used to keep clients up to date with the changes,
/// not used during syncing phase.
对消除这种担忧没有太大作用。代码需要防止滥用。评论不会被执行。做得更好:
void Server::deliver(const std::string& buffer) {
post(io_context_,
[this, buffer] { connection_manager_.broadcast(std::move(buffer)); });
}
您在接受“新”写入之前没有检查之前的写入是否已完成。这意味着调用 Connection::do_write
会导致 Undefined Behaviour,原因有两个:
在使用该缓冲区的正在进行的异步操作期间修改 outgoing_buffer_
是 UB
在同一个 IO 对象上有两个重叠的 async_write
是 UB(参见 docs
解决这个问题的典型方法是使用一个传出消息队列。
使用 async_read_some
很少是您想要的,特别是因为读取不会累积到动态缓冲区中。这意味着如果您的数据包在意想不到的边界处分离,您可能根本无法检测到命令,或者检测不到命令。
而是考虑 asio::async_read_until
使用动态缓冲区(例如
- 直接读入
std::string
,因此您不必将缓冲区复制到字符串中
- 读入
streambuf
以便您可以使用 std::istream(&sbuf_)
来解析而不是标记化
连接 all_json_strs
显然 有 拥有文本容器是一种浪费。相反,使用 const-buffer-sequence 将它们全部组合起来而不进行复制。
更好的是,考虑一种 JSON 序列化的流式方法,这样在任何给定时间并不是所有 JSON 都需要在内存中序列化。
不要声明空析构函数 (~Connection
)。他们是悲观主义者
同样适用于空构造函数 (ConnectionManager
)。如果必须,请考虑
ConnectionManager::ConnectionManager() = default;
getNativeHandle
给我更多关于其他可能干扰的代码的问题。例如。它可能表明其他库正在执行操作,这又会导致重叠 reads/writes,或者它可能是线程上有更多代码的迹象(因为 Server::run()
根据定义是阻塞的)
连接管理器可能会保持 weak_ptr
,因此 Connection
s 最终可能会终止。现在,最后一个引用 通过定义 保存在连接管理器中,这意味着当对等方断开连接或会话因其他原因失败时,不会破坏任何内容。
这不是惯用语:
// Check whether the server was stopped by a signal before this
// completion handler had a chance to run.
if (!acceptor_.is_open()) {
return;
}
如果您关闭了接受器,完成处理程序无论如何都会被调用 error::operation_aborted
。简单地处理它,例如在最终版本中,我稍后 post:
// separate strand for each connection - just in case you ever add threads
acceptor_.async_accept(
make_strand(io_context_), [this](error_code ec, tcp::socket sock) {
if (!ec) {
connection_manager_.register_and_start(
std::make_shared<Connection>(std::move(sock),
connection_manager_));
do_accept();
}
});
我注意到这条评论:
// The server is stopped by cancelling all outstanding asynchronous
// operations. Once all operations have finished the io_service::run()
// call will exit.
事实上,您永远不会 cancel()
在您的代码中对任何 IO 对象进行任何操作。同样,评论不会被执行。最好确实按照你说的去做,让析构函数关闭资源。这可以防止对象为 used-after-close 时的虚假错误,也可以防止非常烦人的竞争条件,例如您关闭了句柄,其他线程 re-opened 同一文件描述符上的新流,并且您已将句柄提供给第三方(使用 getNativeHandle
)...您知道这会导致什么吗?
重现问题?
通过这种方式进行审查后,我试图重现该问题,因此我创建了虚假数据:
std::string getExecutionJsons() { return std::string(1024, 'E'); }
std::string getOrdersAsJsons() { return std::string(13312, 'O'); }
std::string getPositionsAsJsons() { return std::string(8192, 'P'); }
std::string createSyncDoneJson() { return std::string(24576, 'C'); }
对连接进行一些小调整 class:
std::string buff_str =
std::string(buffer_.data(), bytes_transferred);
const auto& tokenized_buffer = split(buff_str, ' ');
if (!tokenized_buffer.empty() &&
tokenized_buffer[0] == "sync") {
std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;
/// "syncing connection" sends a specific text
/// hence I can separate between sycing and long-lived
/// connections here and act accordingly.
const auto& exec_json_strs = getExecutionJsons();
const auto& order_json_strs = getOrdersAsJsons();
const auto& position_json_strs = getPositionsAsJsons();
const auto& all_json_strs = exec_json_strs +
order_json_strs + position_json_strs +
createSyncDoneJson();
std::cerr << "All json length: " << all_json_strs.length() << std::endl;
/// this is potentially a very large data.
do_write(all_json_strs); // already on strand!
}
我们得到服务器输出
sync detected on 127.0.0.1:43012
All json length: 47104
sync detected on 127.0.0.1:43044
All json length: 47104
并且使用 netcat 伪造的客户端:
$ netcat localhost 8989 <<< 'sync me' > expected
^C
$ wc -c expected
47104 expected
很好。现在让我们过早断开连接:
netcat localhost 8989 -w0 <<< 'sync me' > truncated
$ wc -c truncated
0 truncated
所以,它确实导致提前关闭,但服务器仍然说
sync detected on 127.0.0.1:44176
All json length: 47104
让我们也检测 do_write
:
async_write( //
socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()),
[/*this,*/ self](error_code ec, size_t transfer_size) {
std::cerr << "do_write completion: " << transfer_size << " bytes ("
<< ec.message() << ")" << std::endl;
if (!ec) {
/// everything is fine.
} else {
/// what to do here?
// FIXME: probably cancel the read loop so the connection
// closes?
}
});
现在我们看到:
sync detected on 127.0.0.1:44494
All json length: 47104
do_write completion: 47104 bytes (Success)
sync detected on 127.0.0.1:44512
All json length: 47104
do_write completion: 32768 bytes (Operation canceled)
对于一个断开连接和一个“正常”连接。
没有 crashes/undefined 行为的迹象。让我们用 -fsanitize=address,undefined
来检查:干净的记录,甚至添加心跳:
int main() {
Server s("127.0.0.1", "8989");
std::thread yolo([&s] {
using namespace std::literals;
int i = 1;
do {
std::this_thread::sleep_for(5s);
} while (s.deliver("HEARTBEAT DEMO " + std::to_string(i++)));
});
s.run();
yolo.join();
}
结论
上面突出显示但未解决的唯一问题是:
未显示其他线程问题(可能通过 getNativeHandle
)
您可以在连接中进行重叠写入这一事实 do_write
。修正:
void Connection::write(std::string msg) { // public, might not be on the strand
post(socket_.get_executor(),
[self = shared_from_this(), msg = std::move(msg)]() mutable {
self->do_write(std::move(msg));
});
}
void Connection::do_write(std::string msg) { // assumed on the strand
outgoing_.push_back(std::move(msg));
if (outgoing_.size() == 1)
do_write_loop();
}
void Connection::do_write_loop() {
if (outgoing_.size() == 0)
return;
auto self(shared_from_this());
async_write( //
socket_, boost::asio::buffer(outgoing_.front()),
[this, self](error_code ec, size_t transfer_size) {
std::cerr << "write completion: " << transfer_size << " bytes ("
<< ec.message() << ")" << std::endl;
if (!ec) {
outgoing_.pop_front();
do_write_loop();
} else {
socket_.cancel();
// This would ideally be enough to free the connection, but
// since `ConnectionManager` doesn't use `weak_ptr` you need to
// force the issue using kind of an "umbillical cord reflux":
connection_manager_.stop(self);
}
});
}
如您所见,我还拆分了 write
/do_write
以防止调用 off-strand。与 stop
.
相同
完整列表
包含以上所有 remarks/fixes 的完整列表:
文件connection.h
#pragma once
#include <boost/asio.hpp>
#include <array>
#include <deque>
#include <memory>
#include <string>
using boost::asio::ip::tcp;
class ConnectionManager;
/// Represents a single connection from a client.
class Connection : public std::enable_shared_from_this<Connection> {
public:
Connection(const Connection&) = delete;
Connection& operator=(const Connection&) = delete;
/// Construct a connection with the given socket.
explicit Connection(tcp::socket socket, ConnectionManager& manager);
void start();
void stop();
void write(std::string msg);
private:
void do_stop();
void do_write(std::string msg);
void do_write_loop();
/// Perform an asynchronous read operation.
void do_read();
/// Socket for the connection.
tcp::socket socket_;
/// The manager for this connection.
ConnectionManager& connection_manager_;
/// Buffer for incoming data.
std::array<char, 8192> buffer_;
std::deque<std::string> outgoing_;
};
using connection_ptr = std::shared_ptr<Connection>;
文件connection_manager.h
#pragma once
#include <list>
#include "connection.h"
/// Manages open connections so that they may be cleanly stopped when the server
/// needs to shut down.
class ConnectionManager {
public:
ConnectionManager(const ConnectionManager&) = delete;
ConnectionManager& operator=(const ConnectionManager&) = delete;
ConnectionManager() = default; // could be split across h/cpp if you wanted
void register_and_start(connection_ptr c);
void stop(connection_ptr c);
void stop_all();
void broadcast(const std::string& buffer);
// purge defunct connections, returns remaining active connections
size_t garbage_collect();
private:
using handle = std::weak_ptr<connection_ptr::element_type>;
std::list<handle> connections_;
};
文件server.h
#pragma once
#include <boost/asio.hpp>
#include <string>
#include "connection.h"
#include "connection_manager.h"
class Server {
public:
Server(const Server&) = delete;
Server& operator=(const Server&) = delete;
/// Construct the server to listen on the specified TCP address and port,
/// and serve up files from the given directory.
explicit Server(const std::string& address, const std::string& port);
/// Run the server's io_service loop.
void run();
bool deliver(const std::string& buffer);
private:
void do_accept();
void do_await_signal();
boost::asio::io_context io_context_;
boost::asio::any_io_executor strand_{io_context_.get_executor()};
boost::asio::signal_set signals_{strand_};
tcp::acceptor acceptor_{strand_};
ConnectionManager connection_manager_;
};
文件connection.cpp
#include "connection.h"
#include <boost/algorithm/string.hpp>
#include <iostream>
#include <thread>
#include <utility>
#include <vector>
#include "connection_manager.h"
using boost::system::error_code;
Connection::Connection(tcp::socket socket, ConnectionManager& manager)
: socket_(std::move(socket))
, connection_manager_(manager) {}
void Connection::start() { // always assumed on the strand (since connection
// just constructed)
do_read();
}
void Connection::stop() { // public, might not be on the strand
post(socket_.get_executor(),
[self = shared_from_this()]() mutable {
self->do_stop();
});
}
void Connection::do_stop() { // assumed on the strand
socket_.cancel(); // trust shared pointer to destruct
}
namespace /*missing code stubs*/ {
auto split(std::string_view input, char delim) {
std::vector<std::string_view> result;
boost::algorithm::split(result, input,
boost::algorithm::is_from_range(delim, delim));
return result;
}
std::string getExecutionJsons() { return std::string(1024, 'E'); }
std::string getOrdersAsJsons() { return std::string(13312, 'O'); }
std::string getPositionsAsJsons() { return std::string(8192, 'P'); }
std::string createSyncDoneJson() { return std::string(24576, 'C'); }
} // namespace
void Connection::do_read() {
auto self(shared_from_this());
socket_.async_read_some(
boost::asio::buffer(buffer_),
[this, self](error_code ec, size_t bytes_transferred) {
if (!ec) {
std::string buff_str =
std::string(buffer_.data(), bytes_transferred);
const auto& tokenized_buffer = split(buff_str, ' ');
if (!tokenized_buffer.empty() &&
tokenized_buffer[0] == "sync") {
std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;
/// "syncing connection" sends a specific text
/// hence I can separate between sycing and long-lived
/// connections here and act accordingly.
const auto& exec_json_strs = getExecutionJsons();
const auto& order_json_strs = getOrdersAsJsons();
const auto& position_json_strs = getPositionsAsJsons();
const auto& all_json_strs = exec_json_strs +
order_json_strs + position_json_strs +
createSyncDoneJson();
std::cerr << "All json length: " << all_json_strs.length() << std::endl;
/// this is potentially a very large data.
do_write(all_json_strs); // already on strand!
}
do_read();
} else {
std::cerr << "do_read terminating: " << ec.message() << std::endl;
connection_manager_.stop(shared_from_this());
}
});
}
void Connection::write(std::string msg) { // public, might not be on the strand
post(socket_.get_executor(),
[self = shared_from_this(), msg = std::move(msg)]() mutable {
self->do_write(std::move(msg));
});
}
void Connection::do_write(std::string msg) { // assumed on the strand
outgoing_.push_back(std::move(msg));
if (outgoing_.size() == 1)
do_write_loop();
}
void Connection::do_write_loop() {
if (outgoing_.size() == 0)
return;
auto self(shared_from_this());
async_write( //
socket_, boost::asio::buffer(outgoing_.front()),
[this, self](error_code ec, size_t transfer_size) {
std::cerr << "write completion: " << transfer_size << " bytes ("
<< ec.message() << ")" << std::endl;
if (!ec) {
outgoing_.pop_front();
do_write_loop();
} else {
socket_.cancel();
// This would ideally be enough to free the connection, but
// since `ConnectionManager` doesn't use `weak_ptr` you need to
// force the issue using kind of an "umbellical cord reflux":
connection_manager_.stop(self);
}
});
}
文件connection_manager.cpp
#include "connection_manager.h"
void ConnectionManager::register_and_start(connection_ptr c) {
connections_.emplace_back(c);
c->start();
}
void ConnectionManager::stop(connection_ptr c) {
c->stop();
}
void ConnectionManager::stop_all() {
for (auto h : connections_)
if (auto c = h.lock())
c->stop();
}
/// this function is used to keep clients up to date with the changes, not used
/// during syncing phase.
void ConnectionManager::broadcast(const std::string& buffer) {
for (auto h : connections_)
if (auto c = h.lock())
c->write(buffer);
}
size_t ConnectionManager::garbage_collect() {
connections_.remove_if(std::mem_fn(&handle::expired));
return connections_.size();
}
文件server.cpp
#include "server.h"
#include <signal.h>
#include <utility>
using boost::system::error_code;
Server::Server(const std::string& address, const std::string& port)
: io_context_(1) // THREAD HINT: single threaded
, connection_manager_()
{
// Register to handle the signals that indicate when the server should exit.
// It is safe to register for the same signal multiple times in a program,
// provided all registration for the specified signal is made through Asio.
signals_.add(SIGINT);
signals_.add(SIGTERM);
#if defined(SIGQUIT)
signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)
do_await_signal();
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
tcp::resolver resolver(io_context_);
tcp::endpoint endpoint = *resolver.resolve({address, port});
acceptor_.open(endpoint.protocol());
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
do_accept();
}
void Server::run() {
// The io_service::run() call will block until all asynchronous operations
// have finished. While the server is running, there is always at least one
// asynchronous operation outstanding: the asynchronous accept call waiting
// for new incoming connections.
io_context_.run();
}
void Server::do_accept() {
// separate strand for each connection - just in case you ever add threads
acceptor_.async_accept(
make_strand(io_context_), [this](error_code ec, tcp::socket sock) {
if (!ec) {
connection_manager_.register_and_start(
std::make_shared<Connection>(std::move(sock),
connection_manager_));
do_accept();
}
});
}
void Server::do_await_signal() {
signals_.async_wait([this](error_code /*ec*/, int /*signo*/) {
// handler on the strand_ because of the executor on signals_
// The server is stopped by cancelling all outstanding asynchronous
// operations. Once all operations have finished the io_service::run()
// call will exit.
acceptor_.cancel();
connection_manager_.stop_all();
});
}
bool Server::deliver(const std::string& buffer) {
if (io_context_.stopped()) {
return false;
}
post(io_context_,
[this, buffer] { connection_manager_.broadcast(std::move(buffer)); });
return true;
}
文件test.cpp
#include "server.h"
int main() {
Server s("127.0.0.1", "8989");
std::thread yolo([&s] {
using namespace std::literals;
int i = 1;
do {
std::this_thread::sleep_for(5s);
} while (s.deliver("HEARTBEAT DEMO " + std::to_string(i++)));
});
s.run();
yolo.join();
}
当我正常关闭与其连接的客户端时,我的服务器崩溃了,而客户端正在接收大量数据。我正在考虑一个可能的终身错误,就像 boost ASIO 中的大多数错误一样,但是我自己无法指出我的错误。
每个客户端与服务器建立 2 个连接,其中一个用于同步,另一个是长期连接以接收持续更新。在“同步阶段”,客户端接收大量数据以与服务器状态同步(“状态”基本上是 JSON 格式的数据库数据)。同步后,同步连接关闭。客户端通过其他连接接收对数据库的更新(与“同步数据”相比,这些数据当然是非常小的数据)。
这些是相关文件:
connection.h
#pragma once
#include <array>
#include <memory>
#include <string>
#include <boost/asio.hpp>
class ConnectionManager;
/// Represents a single connection from a client.
class Connection : public std::enable_shared_from_this<Connection>
{
public:
Connection(const Connection&) = delete;
Connection& operator=(const Connection&) = delete;
/// Construct a connection with the given socket.
explicit Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager);
/// Start the first asynchronous operation for the connection.
void start();
/// Stop all asynchronous operations associated with the connection.
void stop();
/// Perform an asynchronous write operation.
void do_write(const std::string& buffer);
int getNativeHandle();
~Connection();
private:
/// Perform an asynchronous read operation.
void do_read();
/// Socket for the connection.
boost::asio::ip::tcp::socket socket_;
/// The manager for this connection.
ConnectionManager& connection_manager_;
/// Buffer for incoming data.
std::array<char, 8192> buffer_;
std::string outgoing_buffer_;
};
typedef std::shared_ptr<Connection> connection_ptr;
connection.cpp
#include "connection.h"
#include <utility>
#include <vector>
#include <iostream>
#include <thread>
#include "connection_manager.h"
Connection::Connection(boost::asio::ip::tcp::socket socket, ConnectionManager& manager)
: socket_(std::move(socket))
, connection_manager_(manager)
{
}
void Connection::start()
{
do_read();
}
void Connection::stop()
{
socket_.close();
}
Connection::~Connection()
{
}
void Connection::do_read()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(buffer_), [this, self](boost::system::error_code ec, std::size_t bytes_transferred) {
if (!ec) {
std::string buff_str = std::string(buffer_.data(), bytes_transferred);
const auto& tokenized_buffer = split(buff_str, ' ');
if(!tokenized_buffer.empty() && tokenized_buffer[0] == "sync") {
/// "syncing connection" sends a specific text
/// hence I can separate between sycing and long-lived connections here and act accordingly.
const auto& exec_json_strs = getExecutionJsons();
const auto& order_json_strs = getOrdersAsJsons();
const auto& position_json_strs = getPositionsAsJsons();
const auto& all_json_strs = exec_json_strs + order_json_strs + position_json_strs + createSyncDoneJson();
/// this is potentially a very large data.
do_write(all_json_strs);
}
do_read();
} else {
connection_manager_.stop(shared_from_this());
}
});
}
void Connection::do_write(const std::string& write_buffer)
{
outgoing_buffer_ = write_buffer;
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()), [this, self](boost::system::error_code ec, std::size_t transfer_size) {
if (!ec) {
/// everything is fine.
} else {
/// what to do here?
/// server crashes once I get error code 32 (EPIPE) here.
}
});
}
connection_manager.h
#pragma once
#include <set>
#include "connection.h"
/// Manages open connections so that they may be cleanly stopped when the server
/// needs to shut down.
class ConnectionManager
{
public:
ConnectionManager(const ConnectionManager&) = delete;
ConnectionManager& operator=(const ConnectionManager&) = delete;
/// Construct a connection manager.
ConnectionManager();
/// Add the specified connection to the manager and start it.
void start(connection_ptr c);
/// Stop the specified connection.
void stop(connection_ptr c);
/// Stop all connections.
void stop_all();
void sendAllConnections(const std::string& buffer);
private:
/// The managed connections.
std::set<connection_ptr> connections_;
};
connection_manager.cpp
#include "connection_manager.h"
ConnectionManager::ConnectionManager()
{
}
void ConnectionManager::start(connection_ptr c)
{
connections_.insert(c);
c->start();
}
void ConnectionManager::stop(connection_ptr c)
{
connections_.erase(c);
c->stop();
}
void ConnectionManager::stop_all()
{
for (auto c: connections_)
c->stop();
connections_.clear();
}
/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void ConnectionManager::sendAllConnections(const std::string& buffer)
{
for (auto c: connections_)
c->do_write(buffer);
}
server.h
#pragma once
#include <boost/asio.hpp>
#include <string>
#include "connection.h"
#include "connection_manager.h"
class Server
{
public:
Server(const Server&) = delete;
Server& operator=(const Server&) = delete;
/// Construct the server to listen on the specified TCP address and port, and
/// serve up files from the given directory.
explicit Server(const std::string& address, const std::string& port);
/// Run the server's io_service loop.
void run();
void deliver(const std::string& buffer);
private:
/// Perform an asynchronous accept operation.
void do_accept();
/// Wait for a request to stop the server.
void do_await_stop();
/// The io_service used to perform asynchronous operations.
boost::asio::io_service io_service_;
/// The signal_set is used to register for process termination notifications.
boost::asio::signal_set signals_;
/// Acceptor used to listen for incoming connections.
boost::asio::ip::tcp::acceptor acceptor_;
/// The connection manager which owns all live connections.
ConnectionManager connection_manager_;
/// The *NEXT* socket to be accepted.
boost::asio::ip::tcp::socket socket_;
};
server.cpp
#include "server.h"
#include <signal.h>
#include <utility>
Server::Server(const std::string& address, const std::string& port)
: io_service_()
, signals_(io_service_)
, acceptor_(io_service_)
, connection_manager_()
, socket_(io_service_)
{
// Register to handle the signals that indicate when the server should exit.
// It is safe to register for the same signal multiple times in a program,
// provided all registration for the specified signal is made through Asio.
signals_.add(SIGINT);
signals_.add(SIGTERM);
#if defined(SIGQUIT)
signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)
do_await_stop();
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(io_service_);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve({address, port});
acceptor_.open(endpoint.protocol());
acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
do_accept();
}
void Server::run()
{
// The io_service::run() call will block until all asynchronous operations
// have finished. While the server is running, there is always at least one
// asynchronous operation outstanding: the asynchronous accept call waiting
// for new incoming connections.
io_service_.run();
}
void Server::do_accept()
{
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
{
// Check whether the server was stopped by a signal before this
// completion handler had a chance to run.
if (!acceptor_.is_open())
{
return;
}
if (!ec)
{
connection_manager_.start(std::make_shared<Connection>(
std::move(socket_), connection_manager_));
}
do_accept();
});
}
void Server::do_await_stop()
{
signals_.async_wait(
[this](boost::system::error_code /*ec*/, int /*signo*/)
{
// The server is stopped by cancelling all outstanding asynchronous
// operations. Once all operations have finished the io_service::run()
// call will exit.
acceptor_.close();
connection_manager_.stop_all();
});
}
/// this function is used to keep clients up to date with the changes, not used during syncing phase.
void Server::deliver(const std::string& buffer)
{
connection_manager_.sendAllConnections(buffer);
}
所以,我重复我的问题:当我优雅地关闭与其连接的客户端时,我的服务器崩溃了,而客户端正在接收大量数据,我不知道为什么。
编辑:当我收到 EPIPE 错误时,async_write 函数发生崩溃。该应用程序是多线程的。有 4 个线程在生成数据时使用它们自己的数据调用 Server::deliver。 deliver() 用于使客户端保持最新状态,它与初始同步无关:同步是通过从数据库中获取的持久数据完成的。
我有一个 io_service,所以我认为我不需要股线。 io_service::run 在主线程上调用,因此主线程处于阻塞状态。
正在审核,添加一些缺失的代码位:
namespace /*missing code stubs*/ {
auto split(std::string_view input, char delim) {
std::vector<std::string_view> result;
boost::algorithm::split(result, input,
boost::algorithm::is_from_range(delim, delim));
return result;
}
std::string getExecutionJsons() { return ""; }
std::string getOrdersAsJsons() { return ""; }
std::string getPositionsAsJsons() { return ""; }
std::string createSyncDoneJson() { return ""; }
}
现在我注意到的是:
你只有一个
io_service
,所以只有一个线程。好的,除非您的其他代码中有线程(main
,例如?),否则不需要线程。怀疑线程在起作用的一个特殊原因是没有人可能调用
Server::deliver
因为run()
正在阻塞。这意味着无论何时调用deliver()
现在它都会导致数据竞争,从而导致 Undefined Behaviour随口评论
/// this function is used to keep clients up to date with the changes, /// not used during syncing phase.
对消除这种担忧没有太大作用。代码需要防止滥用。评论不会被执行。做得更好:
void Server::deliver(const std::string& buffer) { post(io_context_, [this, buffer] { connection_manager_.broadcast(std::move(buffer)); }); }
您在接受“新”写入之前没有检查之前的写入是否已完成。这意味着调用
Connection::do_write
会导致 Undefined Behaviour,原因有两个:在使用该缓冲区的正在进行的异步操作期间修改
outgoing_buffer_
是 UB在同一个 IO 对象上有两个重叠的
async_write
是 UB(参见 docs
解决这个问题的典型方法是使用一个传出消息队列。
使用
async_read_some
很少是您想要的,特别是因为读取不会累积到动态缓冲区中。这意味着如果您的数据包在意想不到的边界处分离,您可能根本无法检测到命令,或者检测不到命令。而是考虑
asio::async_read_until
使用动态缓冲区(例如- 直接读入
std::string
,因此您不必将缓冲区复制到字符串中 - 读入
streambuf
以便您可以使用std::istream(&sbuf_)
来解析而不是标记化
- 直接读入
连接
all_json_strs
显然 有 拥有文本容器是一种浪费。相反,使用 const-buffer-sequence 将它们全部组合起来而不进行复制。更好的是,考虑一种 JSON 序列化的流式方法,这样在任何给定时间并不是所有 JSON 都需要在内存中序列化。
不要声明空析构函数 (
~Connection
)。他们是悲观主义者同样适用于空构造函数 (
ConnectionManager
)。如果必须,请考虑ConnectionManager::ConnectionManager() = default;
getNativeHandle
给我更多关于其他可能干扰的代码的问题。例如。它可能表明其他库正在执行操作,这又会导致重叠 reads/writes,或者它可能是线程上有更多代码的迹象(因为Server::run()
根据定义是阻塞的)连接管理器可能会保持
weak_ptr
,因此Connection
s 最终可能会终止。现在,最后一个引用 通过定义 保存在连接管理器中,这意味着当对等方断开连接或会话因其他原因失败时,不会破坏任何内容。这不是惯用语:
// Check whether the server was stopped by a signal before this // completion handler had a chance to run. if (!acceptor_.is_open()) { return; }
如果您关闭了接受器,完成处理程序无论如何都会被调用
error::operation_aborted
。简单地处理它,例如在最终版本中,我稍后 post:// separate strand for each connection - just in case you ever add threads acceptor_.async_accept( make_strand(io_context_), [this](error_code ec, tcp::socket sock) { if (!ec) { connection_manager_.register_and_start( std::make_shared<Connection>(std::move(sock), connection_manager_)); do_accept(); } });
我注意到这条评论:
// The server is stopped by cancelling all outstanding asynchronous // operations. Once all operations have finished the io_service::run() // call will exit.
事实上,您永远不会
cancel()
在您的代码中对任何 IO 对象进行任何操作。同样,评论不会被执行。最好确实按照你说的去做,让析构函数关闭资源。这可以防止对象为 used-after-close 时的虚假错误,也可以防止非常烦人的竞争条件,例如您关闭了句柄,其他线程 re-opened 同一文件描述符上的新流,并且您已将句柄提供给第三方(使用getNativeHandle
)...您知道这会导致什么吗?
重现问题?
通过这种方式进行审查后,我试图重现该问题,因此我创建了虚假数据:
std::string getExecutionJsons() { return std::string(1024, 'E'); }
std::string getOrdersAsJsons() { return std::string(13312, 'O'); }
std::string getPositionsAsJsons() { return std::string(8192, 'P'); }
std::string createSyncDoneJson() { return std::string(24576, 'C'); }
对连接进行一些小调整 class:
std::string buff_str =
std::string(buffer_.data(), bytes_transferred);
const auto& tokenized_buffer = split(buff_str, ' ');
if (!tokenized_buffer.empty() &&
tokenized_buffer[0] == "sync") {
std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl;
/// "syncing connection" sends a specific text
/// hence I can separate between sycing and long-lived
/// connections here and act accordingly.
const auto& exec_json_strs = getExecutionJsons();
const auto& order_json_strs = getOrdersAsJsons();
const auto& position_json_strs = getPositionsAsJsons();
const auto& all_json_strs = exec_json_strs +
order_json_strs + position_json_strs +
createSyncDoneJson();
std::cerr << "All json length: " << all_json_strs.length() << std::endl;
/// this is potentially a very large data.
do_write(all_json_strs); // already on strand!
}
我们得到服务器输出
sync detected on 127.0.0.1:43012
All json length: 47104
sync detected on 127.0.0.1:43044
All json length: 47104
并且使用 netcat 伪造的客户端:
$ netcat localhost 8989 <<< 'sync me' > expected
^C
$ wc -c expected
47104 expected
很好。现在让我们过早断开连接:
netcat localhost 8989 -w0 <<< 'sync me' > truncated
$ wc -c truncated
0 truncated
所以,它确实导致提前关闭,但服务器仍然说
sync detected on 127.0.0.1:44176
All json length: 47104
让我们也检测 do_write
:
async_write( //
socket_, boost::asio::buffer(outgoing_buffer_, outgoing_buffer_.size()),
[/*this,*/ self](error_code ec, size_t transfer_size) {
std::cerr << "do_write completion: " << transfer_size << " bytes ("
<< ec.message() << ")" << std::endl;
if (!ec) {
/// everything is fine.
} else {
/// what to do here?
// FIXME: probably cancel the read loop so the connection
// closes?
}
});
现在我们看到:
sync detected on 127.0.0.1:44494
All json length: 47104
do_write completion: 47104 bytes (Success)
sync detected on 127.0.0.1:44512
All json length: 47104
do_write completion: 32768 bytes (Operation canceled)
对于一个断开连接和一个“正常”连接。
没有 crashes/undefined 行为的迹象。让我们用 -fsanitize=address,undefined
来检查:干净的记录,甚至添加心跳:
int main() {
Server s("127.0.0.1", "8989");
std::thread yolo([&s] {
using namespace std::literals;
int i = 1;
do {
std::this_thread::sleep_for(5s);
} while (s.deliver("HEARTBEAT DEMO " + std::to_string(i++)));
});
s.run();
yolo.join();
}
结论
上面突出显示但未解决的唯一问题是:
未显示其他线程问题(可能通过
getNativeHandle
)您可以在连接中进行重叠写入这一事实
do_write
。修正:void Connection::write(std::string msg) { // public, might not be on the strand post(socket_.get_executor(), [self = shared_from_this(), msg = std::move(msg)]() mutable { self->do_write(std::move(msg)); }); } void Connection::do_write(std::string msg) { // assumed on the strand outgoing_.push_back(std::move(msg)); if (outgoing_.size() == 1) do_write_loop(); } void Connection::do_write_loop() { if (outgoing_.size() == 0) return; auto self(shared_from_this()); async_write( // socket_, boost::asio::buffer(outgoing_.front()), [this, self](error_code ec, size_t transfer_size) { std::cerr << "write completion: " << transfer_size << " bytes (" << ec.message() << ")" << std::endl; if (!ec) { outgoing_.pop_front(); do_write_loop(); } else { socket_.cancel(); // This would ideally be enough to free the connection, but // since `ConnectionManager` doesn't use `weak_ptr` you need to // force the issue using kind of an "umbillical cord reflux": connection_manager_.stop(self); } }); }
如您所见,我还拆分了 write
/do_write
以防止调用 off-strand。与 stop
.
完整列表
包含以上所有 remarks/fixes 的完整列表:
文件
connection.h
#pragma once #include <boost/asio.hpp> #include <array> #include <deque> #include <memory> #include <string> using boost::asio::ip::tcp; class ConnectionManager; /// Represents a single connection from a client. class Connection : public std::enable_shared_from_this<Connection> { public: Connection(const Connection&) = delete; Connection& operator=(const Connection&) = delete; /// Construct a connection with the given socket. explicit Connection(tcp::socket socket, ConnectionManager& manager); void start(); void stop(); void write(std::string msg); private: void do_stop(); void do_write(std::string msg); void do_write_loop(); /// Perform an asynchronous read operation. void do_read(); /// Socket for the connection. tcp::socket socket_; /// The manager for this connection. ConnectionManager& connection_manager_; /// Buffer for incoming data. std::array<char, 8192> buffer_; std::deque<std::string> outgoing_; }; using connection_ptr = std::shared_ptr<Connection>;
文件
connection_manager.h
#pragma once #include <list> #include "connection.h" /// Manages open connections so that they may be cleanly stopped when the server /// needs to shut down. class ConnectionManager { public: ConnectionManager(const ConnectionManager&) = delete; ConnectionManager& operator=(const ConnectionManager&) = delete; ConnectionManager() = default; // could be split across h/cpp if you wanted void register_and_start(connection_ptr c); void stop(connection_ptr c); void stop_all(); void broadcast(const std::string& buffer); // purge defunct connections, returns remaining active connections size_t garbage_collect(); private: using handle = std::weak_ptr<connection_ptr::element_type>; std::list<handle> connections_; };
文件
server.h
#pragma once #include <boost/asio.hpp> #include <string> #include "connection.h" #include "connection_manager.h" class Server { public: Server(const Server&) = delete; Server& operator=(const Server&) = delete; /// Construct the server to listen on the specified TCP address and port, /// and serve up files from the given directory. explicit Server(const std::string& address, const std::string& port); /// Run the server's io_service loop. void run(); bool deliver(const std::string& buffer); private: void do_accept(); void do_await_signal(); boost::asio::io_context io_context_; boost::asio::any_io_executor strand_{io_context_.get_executor()}; boost::asio::signal_set signals_{strand_}; tcp::acceptor acceptor_{strand_}; ConnectionManager connection_manager_; };
文件
connection.cpp
#include "connection.h" #include <boost/algorithm/string.hpp> #include <iostream> #include <thread> #include <utility> #include <vector> #include "connection_manager.h" using boost::system::error_code; Connection::Connection(tcp::socket socket, ConnectionManager& manager) : socket_(std::move(socket)) , connection_manager_(manager) {} void Connection::start() { // always assumed on the strand (since connection // just constructed) do_read(); } void Connection::stop() { // public, might not be on the strand post(socket_.get_executor(), [self = shared_from_this()]() mutable { self->do_stop(); }); } void Connection::do_stop() { // assumed on the strand socket_.cancel(); // trust shared pointer to destruct } namespace /*missing code stubs*/ { auto split(std::string_view input, char delim) { std::vector<std::string_view> result; boost::algorithm::split(result, input, boost::algorithm::is_from_range(delim, delim)); return result; } std::string getExecutionJsons() { return std::string(1024, 'E'); } std::string getOrdersAsJsons() { return std::string(13312, 'O'); } std::string getPositionsAsJsons() { return std::string(8192, 'P'); } std::string createSyncDoneJson() { return std::string(24576, 'C'); } } // namespace void Connection::do_read() { auto self(shared_from_this()); socket_.async_read_some( boost::asio::buffer(buffer_), [this, self](error_code ec, size_t bytes_transferred) { if (!ec) { std::string buff_str = std::string(buffer_.data(), bytes_transferred); const auto& tokenized_buffer = split(buff_str, ' '); if (!tokenized_buffer.empty() && tokenized_buffer[0] == "sync") { std::cerr << "sync detected on " << socket_.remote_endpoint() << std::endl; /// "syncing connection" sends a specific text /// hence I can separate between sycing and long-lived /// connections here and act accordingly. const auto& exec_json_strs = getExecutionJsons(); const auto& order_json_strs = getOrdersAsJsons(); const auto& position_json_strs = getPositionsAsJsons(); const auto& all_json_strs = exec_json_strs + order_json_strs + position_json_strs + createSyncDoneJson(); std::cerr << "All json length: " << all_json_strs.length() << std::endl; /// this is potentially a very large data. do_write(all_json_strs); // already on strand! } do_read(); } else { std::cerr << "do_read terminating: " << ec.message() << std::endl; connection_manager_.stop(shared_from_this()); } }); } void Connection::write(std::string msg) { // public, might not be on the strand post(socket_.get_executor(), [self = shared_from_this(), msg = std::move(msg)]() mutable { self->do_write(std::move(msg)); }); } void Connection::do_write(std::string msg) { // assumed on the strand outgoing_.push_back(std::move(msg)); if (outgoing_.size() == 1) do_write_loop(); } void Connection::do_write_loop() { if (outgoing_.size() == 0) return; auto self(shared_from_this()); async_write( // socket_, boost::asio::buffer(outgoing_.front()), [this, self](error_code ec, size_t transfer_size) { std::cerr << "write completion: " << transfer_size << " bytes (" << ec.message() << ")" << std::endl; if (!ec) { outgoing_.pop_front(); do_write_loop(); } else { socket_.cancel(); // This would ideally be enough to free the connection, but // since `ConnectionManager` doesn't use `weak_ptr` you need to // force the issue using kind of an "umbellical cord reflux": connection_manager_.stop(self); } }); }
文件
connection_manager.cpp
#include "connection_manager.h" void ConnectionManager::register_and_start(connection_ptr c) { connections_.emplace_back(c); c->start(); } void ConnectionManager::stop(connection_ptr c) { c->stop(); } void ConnectionManager::stop_all() { for (auto h : connections_) if (auto c = h.lock()) c->stop(); } /// this function is used to keep clients up to date with the changes, not used /// during syncing phase. void ConnectionManager::broadcast(const std::string& buffer) { for (auto h : connections_) if (auto c = h.lock()) c->write(buffer); } size_t ConnectionManager::garbage_collect() { connections_.remove_if(std::mem_fn(&handle::expired)); return connections_.size(); }
文件
server.cpp
#include "server.h" #include <signal.h> #include <utility> using boost::system::error_code; Server::Server(const std::string& address, const std::string& port) : io_context_(1) // THREAD HINT: single threaded , connection_manager_() { // Register to handle the signals that indicate when the server should exit. // It is safe to register for the same signal multiple times in a program, // provided all registration for the specified signal is made through Asio. signals_.add(SIGINT); signals_.add(SIGTERM); #if defined(SIGQUIT) signals_.add(SIGQUIT); #endif // defined(SIGQUIT) do_await_signal(); // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR). tcp::resolver resolver(io_context_); tcp::endpoint endpoint = *resolver.resolve({address, port}); acceptor_.open(endpoint.protocol()); acceptor_.set_option(tcp::acceptor::reuse_address(true)); acceptor_.bind(endpoint); acceptor_.listen(); do_accept(); } void Server::run() { // The io_service::run() call will block until all asynchronous operations // have finished. While the server is running, there is always at least one // asynchronous operation outstanding: the asynchronous accept call waiting // for new incoming connections. io_context_.run(); } void Server::do_accept() { // separate strand for each connection - just in case you ever add threads acceptor_.async_accept( make_strand(io_context_), [this](error_code ec, tcp::socket sock) { if (!ec) { connection_manager_.register_and_start( std::make_shared<Connection>(std::move(sock), connection_manager_)); do_accept(); } }); } void Server::do_await_signal() { signals_.async_wait([this](error_code /*ec*/, int /*signo*/) { // handler on the strand_ because of the executor on signals_ // The server is stopped by cancelling all outstanding asynchronous // operations. Once all operations have finished the io_service::run() // call will exit. acceptor_.cancel(); connection_manager_.stop_all(); }); } bool Server::deliver(const std::string& buffer) { if (io_context_.stopped()) { return false; } post(io_context_, [this, buffer] { connection_manager_.broadcast(std::move(buffer)); }); return true; }
文件
test.cpp
#include "server.h" int main() { Server s("127.0.0.1", "8989"); std::thread yolo([&s] { using namespace std::literals; int i = 1; do { std::this_thread::sleep_for(5s); } while (s.deliver("HEARTBEAT DEMO " + std::to_string(i++))); }); s.run(); yolo.join(); }