如何从第一个服务器 运行 第二个服务器?

How to run second server from first?

下午好。我正在使用这段代码并希望对其进行修改以满足我的需要。 https://www.boost.org/doc/libs/1_77_0/doc/html/boost_asio/example/cpp11/echo/async_tcp_echo_server.cpp

在函数 do_write 中,我想启动另一个服务器,它将在任意端口上,在某个时刻它可以自行关闭并向它所在的连接发送一个关于此的信号已创建(当然,连接一直处于活动状态)。 我无法将服务器添加到目前已经启动的 io_context。还有其他可能吗?我尝试创建 io_context 的另一个实例并在不同的线程上创建服务器,但这对我也不起作用。

我从那个链接示例开始,并使 session 读取逐行命令:

  • LISTEN <port> 在该端口上启动一个新的侦听器
  • STOP <port> 停止监听器
  • EXIT关闭连接

侦听器按端口号保存在全局映射中:

using Port     = uint16_t;
using Server   = std::weak_ptr<class server>;
std::map<Port, Server> g_listeners;

服务器采用可选 Callback:

using Callback = std::function<void()>;

如果侦听器仍处于活动状态,则用于向最初启动它的连接报告侦听器退出。

命令

阅读逐行命令非常简单:

void do_read()
{
    async_read_until(
        socket_, data_, "\n",
        [this, self = shared_from_this()](error_code ec, size_t length) {
            if (ec) {
                std::cerr << "do_read: " << ec.message() << std::endl;
            } else {
                std::string line;
                getline(std::istream(&data_), line);
                if (handle(line))
                    do_read();
            }
        });
}

handle 解析它们 - 我尽可能简单:

bool handle(std::string const& command)
{
    try {
        std::istringstream parser(command);
        parser.exceptions(std::ios::failbit | std::ios::badbit);

        std::string cmd;
        Port        port;

        if (parser >> cmd && (cmd == "LISTEN") && parser >> port) {
            start_listener(port);
            return true;
        }

        parser.clear();
        parser.str(command);
        if (parser >> cmd && (cmd == "STOP") && parser >> port) {
            stop_listener(port);
            return true;
        }

        parser.clear();
        parser.str(command);
        if (parser >> cmd && (cmd == "EXIT")) {
            message("Goodbye");
            return false;
        }
        message("Invalid command"s);
    } catch (std::exception const& e) {
        message("Invalid argument"s);
    }
    return true;
}

现在命令 start_listenerstop_listenerg_listeners 容器中插入或删除 server 个实例:

void session::start_listener(Port port)
{
    auto [it, inserted] = g_listeners.try_emplace(port);

    if (!inserted) {
        message("Already listening on port " + std::to_string(port));
    } else {
        auto on_close = [handle = weak_from_this(), port] {
            if (auto self = handle.lock())
                self->message("The listener for port " + std::to_string(port) + " has closed");
        };
        auto s = std::make_shared<server>( //
            socket_.get_executor(), port, on_close);
        it->second = s;
        s->start();
        message("Started listening on port " + std::to_string(port));
    }
}

void session::stop_listener(Port port)
{
    auto it = g_listeners.find(port);
    if (it != g_listeners.end()) {
        if (auto server = it->second.lock())
        {
            message("Stopping listener on port " + std::to_string(port));
            server->stop();
        } else {
            // when two connections simultaneously STOP the same listener?
            message("Listener on port " + std::to_string(port) + " already stopped");
        }
        g_listeners.erase(it);
    } else {
        message("No listener on port " + std::to_string(port));
    }
}

请注意,两端都使用弱指针来防止对已销毁的实例进行操作,例如

  • 当启动侦听器的连接在回调可以执行之前关闭时
  • 同时从多个连接命令侦听器 STOP

其他注意事项

对于响应消息,我使用 outbox_ 模式,因此即使排队的消息不止一条,缓冲区的生命周期也能得到保证。

对于服务器,我将 io_context& 参数更改为 executor;结果是等效的,但从任何 IO 对象获取执行程序比传递对 io_context& 的引用要简单得多。当然,您可以通过将 io_context 设为全局变量来“作弊”。

演示

Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <iomanip>

using boost::asio::ip::tcp;
using boost::system::error_code;
using executor_type = boost::asio::any_io_executor;
using namespace std::literals;

using Port     = uint16_t;
using Callback = std::function<void()>;
using Server   = std::weak_ptr<class server>;
std::map<Port, Server> g_listeners;

class session : public std::enable_shared_from_this<session> {
  public:
    session(tcp::socket socket) : socket_(std::move(socket)) {}

    void start()
    {
        do_read();
        message("Welcome");
    }

  private:
    void do_read()
    {
        async_read_until(
            socket_, data_, "\n",
            [this, self = shared_from_this()](error_code ec, size_t length) {
                if (ec) {
                    std::cerr << "do_read: " << ec.message() << std::endl;
                } else {
                    std::string line;
                    getline(std::istream(&data_), line);
                    if (handle(line))
                        do_read();
                }
            });
    }

    bool handle(std::string const& command)
    {
        try {
            std::istringstream parser(command);
            parser.exceptions(std::ios::failbit | std::ios::badbit);

            std::string cmd;
            Port        port;

            if (parser >> cmd && (cmd == "LISTEN") && parser >> port) {
                start_listener(port);
                return true;
            }

            parser.clear();
            parser.str(command);
            if (parser >> cmd && (cmd == "STOP") && parser >> port) {
                stop_listener(port);
                return true;
            }

            parser.clear();
            parser.str(command);
            if (parser >> cmd && (cmd == "EXIT")) {
                message("Goodbye");
                return false;
            }
            message("Invalid command"s);
        } catch (std::exception const& e) {
            message("Invalid argument"s);
        }
        return true;
    }

    void message(std::string msg) {
        outbox_.push_back(std::move(msg) + "\n");
        if (outbox_.size() == 1)
            do_write();
    }

    void do_write() {
        async_write( //
            socket_, boost::asio::buffer(outbox_.front()),
            [this, self = shared_from_this()](error_code ec, size_t) {
                if (ec) {
                    std::cerr << "do_write: " << ec.message() << std::endl;
                }
                outbox_.pop_front();
                if (!outbox_.empty())
                    do_write();
            });
    }

    void start_listener(Port port);
    void stop_listener(Port port);

    tcp::socket socket_;
    boost::asio::streambuf data_{32*1024}; // max size
    std::deque<std::string> outbox_;
};

class server: public std::enable_shared_from_this<server> {
  public:
    server(
        executor_type exe, short port, Callback callback = [] {})
        : acceptor_(exe, tcp::endpoint(tcp::v4(), port))
        , on_close_(callback)
    {
    }

    void start() { do_accept(); }
    void stop() { acceptor_.close(); }

  private:
    void do_accept()
    {
        acceptor_.async_accept(
            [this, self = shared_from_this()] //
            (error_code ec, tcp::socket socket) {
                if (!ec) {
                    std::make_shared<session>(std::move(socket))->start();
                    do_accept();
                } else {
                    if (on_close_)
                        on_close_();
                }
            });
    }

    tcp::acceptor acceptor_;
    Callback      on_close_;
};

void session::start_listener(Port port)
{
    auto [it, inserted] = g_listeners.try_emplace(port);

    if (!inserted) {
        message("Already listening on port " + std::to_string(port));
    } else {
        auto on_close = [handle = weak_from_this(), port] {
            if (auto self = handle.lock())
                self->message("The listener for port " + std::to_string(port) + " has closed");
        };
        auto s = std::make_shared<server>( //
            socket_.get_executor(), port, on_close);
        it->second = s;
        s->start();
        message("Started listening on port " + std::to_string(port));
    }
}

void session::stop_listener(Port port)
{
    auto it = g_listeners.find(port);
    if (it != g_listeners.end()) {
        if (auto server = it->second.lock())
        {
            message("Stopping listener on port " + std::to_string(port));
            server->stop();
        } else {
            // when two connections simultaneously STOP the same listener?
            message("Listener on port " + std::to_string(port) + " already stopped");
        }
        g_listeners.erase(it);
    } else {
        message("No listener on port " + std::to_string(port));
    }
}

int main(int argc, char* argv[])
{
    try {
        if (argc != 2) {
            std::cerr << "Usage: async_tcp_echo_server <port>\n";
            return 1;
        }

        boost::asio::io_context io_context;

        {
            Port port = std::atoi(argv[1]);
            auto s = std::make_shared<server>(io_context.get_executor(), port);
            s->start();

            g_listeners.emplace(port, s);
        }

        io_context.run();
    } catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << "\n";
    }
}

哪个演示是交互式的: