Boost Beast Async Websocket Server 如何与会话交互?

Boost Beast Async Websocket Server How to interface with session?

所以我不知道为什么,但我不能全神贯注于 boost Beast websocket 服务器以及如何(或应该)与它交互。

我制作的基本程序看起来像这样,跨越 2 classes(WebSocketListenerWebSocketSessionhttps://www.boost.org/doc/libs/develop/libs/beast/example/websocket/server/async/websocket_server_async.cpp

一切正常,我可以连接,并且可以回显消息。我们将永远只有 1 个活动会话,我正在努力了解如何从其 class 外部与此会话进行交互,例如在我的 int main() 或另一个 class 中负责发行read/writes。我们将使用一个简单的命令设计模式,命令异步进入缓冲区,针对硬件进行处理,然后 async_write 返回结果。读取和排队是直截了当的,将在 WebsocketSession 中完成,但我看到的所有写入内容都直接在会话中 reading/writing 而不是获取外部输入。

我看过使用 boost::asio::async_write(socket, buffer, ...) 之类的示例,但我很难理解当会话由侦听器本身创建时如何获取对所述套接字的引用。

我不依赖会话外部的套接字,而是依赖您的程序逻辑来实现会话。

那是因为会话(连接)将控制其自身的生命周期,自发到达并可能自发断开连接。您的硬件很可能没有。

所以,借用“依赖注入”的概念告诉你的听众你的应用程序逻辑,然后从会话中调用它。 (侦听器会将依赖项“注入”到每个新创建的会话中)。

让我们从链接示例的 simplified/modernized 版本开始。

现在,在我们准备响应的地方,您需要注入自己的逻辑,所以让我们按照我们的想象来编写它:

void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
    if (ec == websocket::error::closed) return;
    if (ec.failed())                    return fail(ec, "read");

    // Process the message
    response_ = logic_->Process(beast::buffers_to_string(buffer_));

    ws_.async_write(
        net::buffer(response_),
        beast::bind_front_handler(&session::on_write, shared_from_this()));
}

这里我们声明成员并从构造函数中初始化它们:

    std::string                          response_;
    std::shared_ptr<AppDomain::Logic>    logic_;

  public:
    explicit session(tcp::socket&&                     socket,
                     std::shared_ptr<AppDomain::Logic> logic)
        : ws_(std::move(socket))
        , logic_(logic) {}

现在,我们需要将逻辑注入侦听器,以便我们可以传递它:

class listener : public std::enable_shared_from_this<listener> {
    net::any_io_executor              ex_;
    tcp::acceptor                     acceptor_;
    std::shared_ptr<AppDomain::Logic> logic_;

  public:
    listener(net::any_io_executor ex, tcp::endpoint endpoint,
             std::shared_ptr<AppDomain::Logic> logic)
        : ex_(ex)
        , acceptor_(ex)
        , logic_(logic) {

以便我们传递它:

void on_accept(beast::error_code ec, tcp::socket socket) {
    if (ec) {
        fail(ec, "accept");
    } else {
        std::make_shared<session>(std::move(socket), logic_)->run();
    }

    // Accept another connection
    do_accept();
}

现在在 main 中创建真正的逻辑:

auto logic = std::make_shared<AppDomain::Logic>("Whosebug Demo/");

try {
    // The io_context is required for all I/O
    net::thread_pool ioc(threads);

    std::make_shared<listener>(ioc.get_executor(),
                               tcp::endpoint{address, port}, logic)
        ->run();

    ioc.join();
} catch (beast::system_error const& se) {
    fail(se.code(), "listener");
}

演示逻辑

为了好玩,让我们实现一些随机逻辑,将来可能会在硬件中实现:

namespace AppDomain {
    struct Logic {
        std::string banner;
        Logic(std::string msg) : banner(std::move(msg)) {}

        std::string Process(std::string request) {
            std::cout << "Processing: " << std::quoted(request) << std::endl;

            std::string result;

            auto fold = [&result](auto op, double initial) {
                return [=, &result](auto& ctx) {
                    auto& args = _attr(ctx);
                    auto  v = accumulate(args.begin(), args.end(), initial, op);
                    result  = "Fold:" + std::to_string(v);
                };
            };

            auto invalid = [&result](auto& ctx) {
                result = "Invalid Command: " + _attr(ctx);
            };

            using namespace boost::spirit::x3;
            auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
            auto add = "adding"      >> args[fold(std::plus<>{}, 0)];
            auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
            auto err = lexeme[+char_][invalid];

            phrase_parse(begin(request), end(request), add | mul | err, blank);

            return banner + result;
        }
    };
} // namespace AppDomain

现在您可以看到它的实际效果:Full Listing

从这里去哪里

如果一个请求需要多个响应怎么办?

您需要排队。我通常称这些为 outbox,所以 searching for outbox_, _outbox etc 会给出很多例子。

这些示例还将展示如何处理可以“从外部发起”写入的其他情况,以及如何安全地将这些写入队列。也许这里有一个非常吸引人的例子

列表供参考

以防将来链接失效:

#include <boost/algorithm/string/trim.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <filesystem>
#include <functional>
#include <iostream>

static std::string g_app_name = "app-logic-service";

#include <boost/core/demangle.hpp>  // just for our demo logic
#include <boost/spirit/home/x3.hpp> // idem
#include <numeric>                  // idem

namespace AppDomain {
    struct Logic {
        std::string banner;
        Logic(std::string msg) : banner(std::move(msg)) {}

        std::string Process(std::string request) {
            std::string result;

            auto fold = [&result](auto op, double initial) {
                return [=, &result](auto& ctx) {
                    auto& args = _attr(ctx);
                    auto  v = accumulate(args.begin(), args.end(), initial, op);
                    result  = "Fold:" + std::to_string(v);
                };
            };

            auto invalid = [&result](auto& ctx) {
                result = "Invalid Command: " + _attr(ctx);
            };

            using namespace boost::spirit::x3;
            auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
            auto add = "adding"      >> args[fold(std::plus<>{}, 0)];
            auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
            auto err = lexeme[+char_][invalid];

            phrase_parse(begin(request), end(request), add | mul | err, blank);

            return banner + result;
        }
    };
} // namespace AppDomain

namespace beast     = boost::beast;         // from <boost/beast.hpp>
namespace http      = beast::http;          // from <boost/beast/http.hpp>
namespace websocket = beast::websocket;     // from <boost/beast/websocket.hpp>
namespace net       = boost::asio;          // from <boost/asio.hpp>
using tcp           = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>

// Report a failure
void fail(beast::error_code ec, char const* what) {
    std::cerr << what << ": " << ec.message() << "\n";
}

class session : public std::enable_shared_from_this<session> {
    websocket::stream<beast::tcp_stream> ws_;
    beast::flat_buffer                   buffer_;
    std::string                          response_;
    std::shared_ptr<AppDomain::Logic>    logic_;

public:
    explicit session(tcp::socket&&                     socket,
                    std::shared_ptr<AppDomain::Logic> logic)
        : ws_(std::move(socket))
        , logic_(logic) {}

    void run() {
        // Get on the correct executor
        // strand for thread safety
        dispatch(
            ws_.get_executor(),
            beast::bind_front_handler(&session::on_run, shared_from_this()));
    }

private:
    void on_run() {
        // Set suggested timeout settings for the websocket
        ws_.set_option(websocket::stream_base::timeout::suggested(
            beast::role_type::server));

        // Set a decorator to change the Server of the handshake
        ws_.set_option(websocket::stream_base::decorator(
            [](websocket::response_type& res) {
                res.set(http::field::server,
                        std::string(BOOST_BEAST_VERSION_STRING) + " " +
                            g_app_name);
            }));

        // Accept the websocket handshake
        ws_.async_accept(
            beast::bind_front_handler(&session::on_accept, shared_from_this()));
    }

    void on_accept(beast::error_code ec) {
        if (ec)
            return fail(ec, "accept");

        do_read();
    }

    void do_read() {
        ws_.async_read(
            buffer_,
            beast::bind_front_handler(&session::on_read, shared_from_this()));
    }

    void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
        if (ec == websocket::error::closed) return;
        if (ec.failed())                    return fail(ec, "read");

        // Process the message
        auto request = boost::algorithm::trim_copy(
            beast::buffers_to_string(buffer_.data()));

        std::cout << "Processing: " << std::quoted(request) << " from "
                << beast::get_lowest_layer(ws_).socket().remote_endpoint()
                << std::endl;

        response_ = logic_->Process(request);

        ws_.async_write(
            net::buffer(response_),
            beast::bind_front_handler(&session::on_write, shared_from_this()));
    }

    void on_write(beast::error_code ec, std::size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);

        if (ec)
            return fail(ec, "write");

        // Clear the buffer
        buffer_.consume(buffer_.size());

        // Do another read
        do_read();
    }
};

// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener> {
    net::any_io_executor              ex_;
    tcp::acceptor                     acceptor_;
    std::shared_ptr<AppDomain::Logic> logic_;

public:
    listener(net::any_io_executor ex, tcp::endpoint endpoint,
            std::shared_ptr<AppDomain::Logic> logic)
        : ex_(ex)
        , acceptor_(ex)
        , logic_(logic) {
        acceptor_.open(endpoint.protocol());
        acceptor_.set_option(tcp::acceptor::reuse_address(true));
        acceptor_.bind(endpoint);
        acceptor_.listen(tcp::acceptor::max_listen_connections);
    }

    // Start accepting incoming connections
    void run() { do_accept(); }

private:
    void do_accept() {
        // The new connection gets its own strand
        acceptor_.async_accept(make_strand(ex_),
                            beast::bind_front_handler(&listener::on_accept,
                                                        shared_from_this()));
    }

    void on_accept(beast::error_code ec, tcp::socket socket) {
        if (ec) {
            fail(ec, "accept");
        } else {
            std::make_shared<session>(std::move(socket), logic_)->run();
        }

        // Accept another connection
        do_accept();
    }
};

int main(int argc, char* argv[]) {
    g_app_name = std::filesystem::path(argv[0]).filename();

    if (argc != 4) {
        std::cerr << "Usage: " << g_app_name << " <address> <port> <threads>\n"
                << "Example:\n"
                << "    " << g_app_name << " 0.0.0.0 8080 1\n";
        return 1;
    }
    auto const address = net::ip::make_address(argv[1]);
    auto const port    = static_cast<uint16_t>(std::atoi(argv[2]));
    auto const threads = std::max<int>(1, std::atoi(argv[3]));

    auto logic = std::make_shared<AppDomain::Logic>("Whosebug Demo/");

    try {
        // The io_context is required for all I/O
        net::thread_pool ioc(threads);

        std::make_shared<listener>(ioc.get_executor(),
                                tcp::endpoint{address, port}, logic)
            ->run();

        ioc.join();
    } catch (beast::system_error const& se) {
        fail(se.code(), "listener");
    }
}

更新

为了回应评论,我再次修改了发件箱模式。注意代码中的一些注释。

Compiler Explorer

#include <boost/algorithm/string/trim.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <deque>
#include <filesystem>
#include <functional>
#include <iostream>
#include <list>

static std::string g_app_name = "app-logic-service";

#include <boost/core/demangle.hpp>  // just for our demo logic
#include <boost/spirit/home/x3.hpp> // idem
#include <numeric>                  // idem

namespace AppDomain {
    struct Logic {
        std::string banner;
        Logic(std::string msg) : banner(std::move(msg)) {}

        std::string Process(std::string request) {
            std::string result;

            auto fold = [&result](auto op, double initial) {
                return [=, &result](auto& ctx) {
                    auto& args = _attr(ctx);
                    auto  v = accumulate(args.begin(), args.end(), initial, op);
                    result  = "Fold:" + std::to_string(v);
                };
            };

            auto invalid = [&result](auto& ctx) {
                result = "Invalid Command: " + _attr(ctx);
            };

            using namespace boost::spirit::x3;
            auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
            auto add = "adding"      >> args[fold(std::plus<>{}, 0)];
            auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
            auto err = lexeme[+char_][invalid];

            phrase_parse(begin(request), end(request), add | mul | err, blank);

            return banner + result;
        }
    };
} // namespace AppDomain

namespace beast     = boost::beast;         // from <boost/beast.hpp>
namespace http      = beast::http;          // from <boost/beast/http.hpp>
namespace websocket = beast::websocket;     // from <boost/beast/websocket.hpp>
namespace net       = boost::asio;          // from <boost/asio.hpp>
using tcp           = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>

// Report a failure
void fail(beast::error_code ec, char const* what) {
    std::cerr << what << ": " << ec.message() << "\n";
}

class session : public std::enable_shared_from_this<session> {
    websocket::stream<beast::tcp_stream> ws_;
    beast::flat_buffer                   buffer_;
    std::shared_ptr<AppDomain::Logic>    logic_;

public:
    explicit session(tcp::socket&&                     socket,
                    std::shared_ptr<AppDomain::Logic> logic)
        : ws_(std::move(socket))
        , logic_(logic) {}

    void run() {
        // Get on the correct executor
        // strand for thread safety
        dispatch(
            ws_.get_executor(),
            beast::bind_front_handler(&session::on_run, shared_from_this()));
    }

    void post_message(std::string msg) {
        post(ws_.get_executor(),
            [self = shared_from_this(), this, msg = std::move(msg)] {
                do_post_message(std::move(msg));
            });
    }

private:
    void on_run() {
        // on the strand
        // Set suggested timeout settings for the websocket
        ws_.set_option(websocket::stream_base::timeout::suggested(
            beast::role_type::server));

        // Set a decorator to change the Server of the handshake
        ws_.set_option(websocket::stream_base::decorator(
            [](websocket::response_type& res) {
                res.set(http::field::server,
                        std::string(BOOST_BEAST_VERSION_STRING) + " " +
                            g_app_name);
            }));

        // Accept the websocket handshake
        ws_.async_accept(
            beast::bind_front_handler(&session::on_accept, shared_from_this()));
    }

    void on_accept(beast::error_code ec) {
        // on the strand
        if (ec)
            return fail(ec, "accept");

        do_read();
    }

    void do_read() {
        // on the strand
        buffer_.clear();

        ws_.async_read(
            buffer_,
            beast::bind_front_handler(&session::on_read, shared_from_this()));
    }

    void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
        // on the strand
        if (ec == websocket::error::closed) return;
        if (ec.failed())                    return fail(ec, "read");

        // Process the message
        auto request = boost::algorithm::trim_copy(
            beast::buffers_to_string(buffer_.data()));

        std::cout << "Processing: " << std::quoted(request) << " from "
                << beast::get_lowest_layer(ws_).socket().remote_endpoint()
                << std::endl;

        do_post_message(logic_->Process(request)); // already on the strand

        do_read();
    }

    std::deque<std::string> _outbox;

    void do_post_message(std::string msg) {
        // on the strand
        _outbox.push_back(std::move(msg));

        if (_outbox.size() == 1)
            do_write_loop();
    }

    void do_write_loop() {
        // on the strand
        if (_outbox.empty())
            return;

        ws_.async_write( //
            net::buffer(_outbox.front()),
            [self = shared_from_this(), this] //
            (beast::error_code ec, size_t bytes_transferred) {
                // on the strand
                boost::ignore_unused(bytes_transferred);

                if (ec)
                    return fail(ec, "write");

                _outbox.pop_front();
                do_write_loop();
            });
    }
};

// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener> {
    net::any_io_executor              ex_;
    tcp::acceptor                     acceptor_;
    std::shared_ptr<AppDomain::Logic> logic_;

public:
    listener(net::any_io_executor ex, tcp::endpoint endpoint,
            std::shared_ptr<AppDomain::Logic> logic)
        : ex_(ex)
        , acceptor_(make_strand(ex)) // NOTE to guard sessions_
        , logic_(logic) {
        acceptor_.open(endpoint.protocol());
        acceptor_.set_option(tcp::acceptor::reuse_address(true));
        acceptor_.bind(endpoint);
        acceptor_.listen(tcp::acceptor::max_listen_connections);
    }

    // Start accepting incoming connections
    void run() { do_accept(); }

    void broadcast(std::string msg) {
        post(acceptor_.get_executor(),
            beast::bind_front_handler(&listener::do_broadcast,
                                    shared_from_this(), std::move(msg)));
    }

private:
    using handle_t = std::weak_ptr<session>;
    std::list<handle_t> sessions_;

    void do_broadcast(std::string const& msg) {
        for (auto handle : sessions_)
            if (auto sess = handle.lock())
                sess->post_message(msg);
    }

    void do_accept() {
        // The new connection gets its own strand
        acceptor_.async_accept(make_strand(ex_),
                            beast::bind_front_handler(&listener::on_accept,
                                                        shared_from_this()));
    }

    void on_accept(beast::error_code ec, tcp::socket socket) {
        // on the strand
        if (ec) {
            fail(ec, "accept");
        } else {
            auto sess = std::make_shared<session>(std::move(socket), logic_);
            sessions_.emplace_back(sess);
            // optionally:
            sessions_.remove_if(std::mem_fn(&handle_t::expired));
            sess->run();
        }

        // Accept another connection
        do_accept();
    }
};

static void emulate_hardware_stuff(std::shared_ptr<listener> srv) {
    using std::this_thread::sleep_for;
    using namespace std::chrono_literals;
    // Extremely simplistic. Instead I'd recommend `steady_timer` with
    // `_async_wait` here, but since I'm just making a sketch...
    unsigned i = 0;

    while (true) {
        sleep_for(1s);
        srv->broadcast("Hardware thing #" + std::to_string(++i));
    }
}

int main(int argc, char* argv[]) {
    g_app_name = std::filesystem::path(argv[0]).filename();

    if (argc != 4) {
        std::cerr << "Usage: " << g_app_name << " <address> <port> <threads>\n"
                << "Example:\n"
                << "    " << g_app_name << " 0.0.0.0 8080 1\n";
        return 1;
    }
    auto const address = net::ip::make_address(argv[1]);
    auto const port    = static_cast<uint16_t>(std::atoi(argv[2]));
    auto const threads = std::max<int>(1, std::atoi(argv[3]));

    auto logic = std::make_shared<AppDomain::Logic>("Whosebug Demo/");

    try {
        // The io_context is required for all I/O
        net::thread_pool ioc(threads);

        auto srv = std::make_shared<listener>( //
            ioc.get_executor(),                     //
            tcp::endpoint{address, port},           //
            logic);

        srv->run();

        std::thread something_hardware(emulate_hardware_stuff, srv);

        ioc.join();
        something_hardware.join();
    } catch (beast::system_error const& se) {
        fail(se.code(), "listener");
    }
}

现场演示: