Boost::Beast : 带有 websocket 流水线的服务器
Boost::Beast : server with websocket pipelining
我正在编写一个带有 boost beast 1.70 和 mysql 8 C 连接器的 c++ websocket 服务器。服务器将同时连接多个客户端。特殊之处在于每个客户端将向服务器连续执行 100 个 websocket 请求。我的服务器的每个请求都是 "cpu light",但服务器对每个请求执行 "time heavy" sql 请求。
我已经用 websocket_server_coro.cpp 示例启动了我的服务器。服务器步骤是:
1) websocket 读取
2) 一个 sql 请求
3) websocket 写入
问题是对于给定的用户,服务器在第 2 步 "locked" 并且在这一步和第 3 步完成之前无法读取。因此,这 100 个请求是按顺序解决的。这对我的用例来说太慢了。
我读到非阻塞 read/write 无法使用 boost beast。但是,我现在要做的是在协程中执行 async_read 和 async_write。
void ServerCoro::accept(websocket::stream<beast::tcp_stream> &ws) {
beast::error_code ec;
ws.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));
ws.set_option(websocket::stream_base::decorator([](websocket::response_type &res) {
res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-Server-coro");
}));
ws.async_accept(yield[ec]);
if (ec) return fail(ec, "accept");
while (!_bStop) {
beast::flat_buffer buffer;
ws.async_read(buffer, yield[ec]);
if (ec == websocket::error::closed) {
std::cout << "=> get closed" << std::endl;
return;
}
if (ec) return fail(ec, "read");
auto buffer_str = new std::string(boost::beast::buffers_to_string(buffer.cdata()));
net::post([&, buffer_str] {
// sql async request such as :
// while (status == (mysql_real_query_nonblocking(this->con, sqlRequest.c_str(), sqlRequest.size()))) {
// ioc.poll_one(ec);
// }
// more sql ...
ws.async_write(net::buffer(worker->getResponse()), yield[ec]); // this line is throwing void boost::coroutines::detail::pull_coroutine_impl<void>::pull(): Assertion `! is_running()' failed.
if (ec) return fail(ec, "write");
});
}
}
问题是带有 async_write 的行抛出错误:
void boost::coroutines::detail::pull_coroutine_impl::pull(): 断言`! is_running()' 失败。
如果用 sync_write 替换此行,它可以工作,但服务器对给定用户保持顺序。
我试图在单线程服务器上执行此代码。我还尝试对 async_read 和 async_write 使用相同的链。仍然有断言错误。
对于 websocket 的 boost beast,这样的服务器是不可能的吗?
谢谢。
根据Vinnie Falco的建议,我以"websocket chat"和"async server"为例重写了代码。这是代码的最终工作结果:
void Session::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec == websocket::error::closed) return; // This indicates that the Session was closed
if(ec) return fail(ec, "read");
net::post([&, that = shared_from_this(), ss = std::make_shared<std::string const>(std::move(boost::beast::buffers_to_string(_buffer.cdata())))] {
/* Sql things that call ioc.poll_one(ec) HERE, for me the sql response go inside worker.getResponse() used below */
net::dispatch(_wsStrand, [&, that = shared_from_this(), sss = std::make_shared < std::string const>(worker.getResponse())] {
async_write(sss);
});
});
_buffer.consume(_buffer.size()); // we remove from the buffer what we just read
do_read(); // go for another read
}
void Session::async_write(const std::shared_ptr<std::string const> &message) {
_writeMessages.push_back(message);
if (_writeMessages.size() > 1) {
BOOST_LOG_TRIVIAL(warning) << "WRITE IS LOCKED";
} else {
_ws.text(_ws.got_text());
_ws.async_write(net::buffer(*_writeMessages.front()), boost::asio::bind_executor(_wsStrand, beast::bind_front_handler(
&Session::on_write, this)));
}
}
void Session::on_write(beast::error_code ec, std::size_t)
{
// Handle the error, if any
if(ec) return fail(ec, "write");
// Remove the string from the queue
_writeMessages.erase(_writeMessages.begin());
// Send the next message if any
if(!_writeMessages.empty())
_ws.async_write(net::buffer(*_writeMessages.front()), boost::asio::bind_executor(_wsStrand, beast::bind_front_handler(
&Session::on_write, this)));
}
谢谢。
我正在编写一个带有 boost beast 1.70 和 mysql 8 C 连接器的 c++ websocket 服务器。服务器将同时连接多个客户端。特殊之处在于每个客户端将向服务器连续执行 100 个 websocket 请求。我的服务器的每个请求都是 "cpu light",但服务器对每个请求执行 "time heavy" sql 请求。
我已经用 websocket_server_coro.cpp 示例启动了我的服务器。服务器步骤是:
1) websocket 读取
2) 一个 sql 请求
3) websocket 写入
问题是对于给定的用户,服务器在第 2 步 "locked" 并且在这一步和第 3 步完成之前无法读取。因此,这 100 个请求是按顺序解决的。这对我的用例来说太慢了。
我读到非阻塞 read/write 无法使用 boost beast。但是,我现在要做的是在协程中执行 async_read 和 async_write。
void ServerCoro::accept(websocket::stream<beast::tcp_stream> &ws) {
beast::error_code ec;
ws.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));
ws.set_option(websocket::stream_base::decorator([](websocket::response_type &res) {
res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-Server-coro");
}));
ws.async_accept(yield[ec]);
if (ec) return fail(ec, "accept");
while (!_bStop) {
beast::flat_buffer buffer;
ws.async_read(buffer, yield[ec]);
if (ec == websocket::error::closed) {
std::cout << "=> get closed" << std::endl;
return;
}
if (ec) return fail(ec, "read");
auto buffer_str = new std::string(boost::beast::buffers_to_string(buffer.cdata()));
net::post([&, buffer_str] {
// sql async request such as :
// while (status == (mysql_real_query_nonblocking(this->con, sqlRequest.c_str(), sqlRequest.size()))) {
// ioc.poll_one(ec);
// }
// more sql ...
ws.async_write(net::buffer(worker->getResponse()), yield[ec]); // this line is throwing void boost::coroutines::detail::pull_coroutine_impl<void>::pull(): Assertion `! is_running()' failed.
if (ec) return fail(ec, "write");
});
}
}
问题是带有 async_write 的行抛出错误:
void boost::coroutines::detail::pull_coroutine_impl::pull(): 断言`! is_running()' 失败。
如果用 sync_write 替换此行,它可以工作,但服务器对给定用户保持顺序。 我试图在单线程服务器上执行此代码。我还尝试对 async_read 和 async_write 使用相同的链。仍然有断言错误。
对于 websocket 的 boost beast,这样的服务器是不可能的吗? 谢谢。
根据Vinnie Falco的建议,我以"websocket chat"和"async server"为例重写了代码。这是代码的最终工作结果:
void Session::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec == websocket::error::closed) return; // This indicates that the Session was closed
if(ec) return fail(ec, "read");
net::post([&, that = shared_from_this(), ss = std::make_shared<std::string const>(std::move(boost::beast::buffers_to_string(_buffer.cdata())))] {
/* Sql things that call ioc.poll_one(ec) HERE, for me the sql response go inside worker.getResponse() used below */
net::dispatch(_wsStrand, [&, that = shared_from_this(), sss = std::make_shared < std::string const>(worker.getResponse())] {
async_write(sss);
});
});
_buffer.consume(_buffer.size()); // we remove from the buffer what we just read
do_read(); // go for another read
}
void Session::async_write(const std::shared_ptr<std::string const> &message) {
_writeMessages.push_back(message);
if (_writeMessages.size() > 1) {
BOOST_LOG_TRIVIAL(warning) << "WRITE IS LOCKED";
} else {
_ws.text(_ws.got_text());
_ws.async_write(net::buffer(*_writeMessages.front()), boost::asio::bind_executor(_wsStrand, beast::bind_front_handler(
&Session::on_write, this)));
}
}
void Session::on_write(beast::error_code ec, std::size_t)
{
// Handle the error, if any
if(ec) return fail(ec, "write");
// Remove the string from the queue
_writeMessages.erase(_writeMessages.begin());
// Send the next message if any
if(!_writeMessages.empty())
_ws.async_write(net::buffer(*_writeMessages.front()), boost::asio::bind_executor(_wsStrand, beast::bind_front_handler(
&Session::on_write, this)));
}
谢谢。