Boost::Asio :为什么 async_write 在通过给定套接字发送缓冲区时会截断缓冲区?
Boost::Asio : Why does async_write truncate the buffer when sending it through the given socket?
我目前正在尝试设计一个相当简单的 boost::asio
服务器。我的第一个单元测试相当简单:发送一个 JSON 请求 {"COMMAND": "ADD_1", "VALUE" : 1}
并收到以下响应:
{
"SUCCESS" : true,
"VALUE" : 2
}
然而,从套接字 read
之后,回复被截断了一个字符:
Reply is: {
"SUCCESS" : true,
"VALUE" : 2
Process finished with exit code 0
写入套接字的代码相当简单,一个成员函数class RequestContext
:
void RequestContext::DoWrite(std::size_t length)
{
JSONCPP_STRING parse_err;
Json::Value json_req, json_resp;
auto self(this->shared_from_this());
std::string client_req_str(data_);
if (reader_->parse(client_req_str.c_str(),
client_req_str.c_str() +
client_req_str.length(),
&json_req, &parse_err))
{
try {
// Get JSON response.
json_resp = ProcessRequest(json_req);
json_resp["SUCCESS"] = true;
} catch (const std::exception &ex) {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(ex.what());
}
} else {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(parse_err);
}
std::string resp = Json::writeString(writer_, json_resp);
boost::asio::async_write(socket_,
boost::asio::buffer(&resp[0], resp.size()),
[this, self]
(boost::system::error_code ec,
std::size_t bytes_xfered) {
if (!ec) DoRead();
});
}
我已经验证 ProcessRequest
returns 是正确的值,所以问题显然出在 async_write
上。我试过将第二个参数的值增加到 async_write
,但似乎没有效果。我做错了什么?
可以在下面找到一个最小的可重现示例:
#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <json/json.h>
using boost::asio::ip::tcp;
using boost::system::error_code;
/// NOTE: This class exists exclusively for unit testing.
class RequestClass {
public:
/**
* Initialize class with value n to add sub from input values.
*
* @param n Value to add/sub from input values.
*/
explicit RequestClass(int n) : n_(n) {}
/// Value to add/sub from
int n_;
/**
* Add n to value in JSON request.
*
* @param request JSON request with field "value".
* @return JSON response containing modified field "value" = [original_value] + n.
*/
[[nodiscard]] Json::Value add_n(const Json::Value &request) const
{
Json::Value resp;
resp["SUCCESS"] = true;
// If value is present in request, return value + 1, else return error.
if (request.get("VALUE", NULL) != NULL) {
resp["VALUE"] = request["VALUE"].asInt() + this->n_;
} else {
resp["SUCCESS"] = false;
resp["ERRORS"] = "Invalid value.";
}
return resp;
}
/**
* Sun n from value in JSON request.
*
* @param request JSON request with field "value".
* @return JSON response containing modified field "value" = [original_value] - n.
*/
[[nodiscard]] Json::Value sub_n(const Json::Value &request) const
{
Json::Value resp, value;
resp["SUCCESS"] = true;
// If value is present in request, return value + 1, else return error.
if (request.get("VALUE", NULL) != NULL) {
resp["VALUE"] = request["VALUE"].asInt() - this->n_;
} else {
resp["SUCCESS"] = false;
resp["ERRORS"] = "Invalid value.";
}
return resp;
}
};
typedef std::function<Json::Value(RequestClass, const Json::Value &)> RequestClassMethod;
template<class RequestHandler, class RequestClass>
class RequestContext :
public std::enable_shared_from_this<RequestContext<RequestHandler,
RequestClass>>
{
public:
typedef std::map<std::string, RequestHandler> CommandMap;
RequestContext(tcp::socket socket, CommandMap commands,
RequestClass *request_class_inst)
: socket_(std::move(socket))
, commands_(std::move(commands))
, request_class_inst_(request_class_inst)
, reader_((new Json::CharReaderBuilder)->newCharReader())
{}
void Run()
{
DoRead();
}
void Kill()
{
continue_ = false;
}
private:
tcp::socket socket_;
RequestClass *request_class_inst_;
CommandMap commands_;
/// Reads JSON.
const std::unique_ptr<Json::CharReader> reader_;
/// Writes JSON.
Json::StreamWriterBuilder writer_;
bool continue_ = true;
char data_[2048];
void DoRead()
{
auto self(this->shared_from_this());
socket_.async_read_some(boost::asio::buffer(data_, 2048),
[this, self](error_code ec, std::size_t length)
{
if (!ec)
{
DoWrite(length);
}
});
}
void DoWrite(std::size_t length)
{
JSONCPP_STRING parse_err;
Json::Value json_req, json_resp;
auto self(this->shared_from_this());
std::string client_req_str(data_);
if (reader_->parse(client_req_str.c_str(),
client_req_str.c_str() +
client_req_str.length(),
&json_req, &parse_err))
{
try {
// Get JSON response.
json_resp = ProcessRequest(json_req);
json_resp["SUCCESS"] = true;
} catch (const std::exception &ex) {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(ex.what());
}
} else {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(parse_err);
}
std::string resp = Json::writeString(writer_, json_resp);
boost::asio::async_write(socket_,
boost::asio::buffer(&resp[0], resp.size()),
[this, self]
(boost::system::error_code ec,
std::size_t bytes_xfered) {
if (!ec) DoRead();
});
}
Json::Value ProcessRequest(Json::Value request)
{
Json::Value response;
std::string command = request["COMMAND"].asString();
// If command is not valid, give a response with an error.
if(commands_.find(command) == commands_.end()) {
response["SUCCESS"] = false;
response["ERRORS"] = "Invalid command.";
}
// Otherwise, run the relevant handler.
else {
RequestHandler handler = commands_.at(command);
response = handler(*request_class_inst_, request);
}
return response;
}
};
template<class RequestHandler, class RequestClass>
class Server {
public:
typedef std::map<std::string, RequestHandler> CommandMap;
Server(boost::asio::io_context &io_context, short port,
const CommandMap &commands,
RequestClass *request_class_inst)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
, commands_(commands)
, request_class_inst_(request_class_inst)
{
DoAccept();
}
~Server()
{
Kill();
}
void Kill()
{
continue_ = false;
}
private:
tcp::acceptor acceptor_;
bool continue_ = true;
CommandMap commands_;
RequestClass *request_class_inst_;
void DoAccept()
{
acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket) {
if (!ec)
std::make_shared<RequestContext<RequestHandler, RequestClass>>
(std::move(socket), commands_, request_class_inst_)->Run();
DoAccept();
});
}
};
void RunServer(short port)
{
boost::asio::io_context io_context;
auto *request_inst = new RequestClass(1);
std::map<std::string, RequestClassMethod> commands {
{"ADD_1", std::mem_fn(&RequestClass::add_n)},
{"SUB_1", std::mem_fn(&RequestClass::sub_n)}
};
Server<RequestClassMethod, RequestClass> s(io_context, port, commands,
request_inst);
io_context.run();
}
void RunServerInBackground(short port)
{
std::thread t([port] { RunServer(port); });
t.detach();
}
int main()
{
try
{
RunServerInBackground(5000);
boost::asio::io_context io_context;
tcp::socket s(io_context);
tcp::resolver resolver(io_context);
boost::asio::connect(s, resolver.resolve("127.0.0.1", "5000"));
char request[2048] = R"({"COMMAND": "ADD_1", "VALUE" : 1})";
size_t request_length = std::strlen(request);
boost::asio::write(s, boost::asio::buffer(request, request_length));
char reply[2048];
size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply, request_length));
std::cout << "Reply is: ";
std::cout << reply << std::endl;
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
传出缓冲区需要是class成员,就像data_
一样,这样才能保证生命周期直到async_write
完成。
您还可以通过 linter/runtime 检查 ASAN/UBSAN 或 Valgrind 来发现此类问题。
更新
还有
size_t reply_length =
boost::asio::read(s, boost::asio::buffer(reply, request_length));
错误地使用了 request_length
。通常,请避免在任何时候手动指定缓冲区大小。
此外,您的协议不提供框架,因此您实际上无法为更新的请求保持相同的连接打开(您不知道要完成响应需要多少字节)。我将在第一次请求后通过关闭连接来“修复”它,这样我们就有了一个工作演示。
还有一个带有 continue_
标志的竞争条件,但我会把它留作 reader.
的驱魔
当然,考虑不泄露请求class实例。
哦,我也切换到 Boost JSON,因为它看起来更适合:
#include <boost/asio.hpp>
#include <boost/json.hpp>
#include <boost/json/src.hpp>
#include <iostream>
using boost::asio::ip::tcp;
using boost::system::error_code;
namespace json = boost::json;
using Value = json::object;
/// NOTE: This class exists exclusively for unit testing.
struct Sample {
int n_;
Value add_n(Value const& request) const { return impl(std::plus<>{}, request); }
Value sub_n(Value const& request) const { return impl(std::minus<>{}, request); }
Value mul_n(Value const& request) const { return impl(std::multiplies<>{}, request); }
Value div_n(Value const& request) const { return impl(std::divides<>{}, request); }
private:
template <typename Op> Value impl(Op op, Value const& req) const {
return (req.contains("VALUE"))
? Value{{"VALUE", op(req.at("VALUE").as_int64(), n_)},
{"SUCCESS", true}}
: Value{{"ERRORS", "Invalid value."}, {"SUCCESS", false}};
}
};
using RequestClassMethod =
std::function<Value(Sample, Value const&)>;
template <class RequestHandler, class RequestClass>
class RequestContext
: public std::enable_shared_from_this<
RequestContext<RequestHandler, RequestClass>> {
public:
using CommandMap = std::map<std::string, RequestHandler>;
RequestContext(tcp::socket socket, CommandMap commands,
RequestClass* request_class_inst)
: socket_(std::move(socket))
, commands_(std::move(commands))
, request_class_inst_(request_class_inst)
{}
void Run() { DoRead(); }
void Kill() { continue_ = false; }
private:
tcp::socket socket_;
CommandMap commands_;
RequestClass* request_class_inst_;
bool continue_ = true;
char data_[2048];
std::string resp_;
void DoRead()
{
socket_.async_read_some(
boost::asio::buffer(data_),
[this, self = this->shared_from_this()](error_code ec, std::size_t length) {
if (!ec) {
DoWrite(length);
}
});
}
void DoWrite(std::size_t length)
{
Value json_resp;
try {
auto json_req = json::parse({data_, length}).as_object();
json_resp = ProcessRequest(json_req);
json_resp["SUCCESS"] = true;
} catch (std::exception const& ex) {
json_resp = {{"SUCCESS", false}, {"ERRORS", ex.what()}};
}
resp_ = json::serialize(json_resp);
boost::asio::async_write(socket_, boost::asio::buffer(resp_),
[this, self = this->shared_from_this()](
error_code ec, size_t bytes_xfered) {
if (!ec)
DoRead();
});
}
Value ProcessRequest(Value request)
{
auto command = request.contains("COMMAND")
? request["COMMAND"].as_string() //
: "";
std::string cmdstr(command.data(), command.size());
// If command is not valid, give a response with an error.
return commands_.contains(cmdstr) && request_class_inst_
? commands_.at(cmdstr)(*request_class_inst_, request)
: Value{{"SUCCESS", false}, {"ERRORS","Invalid command."}};
}
};
template<class RequestHandler, class RequestClass>
class Server {
public:
using CommandMap = std::map<std::string, RequestHandler>;
Server(boost::asio::io_context& io_context, uint16_t port,
const CommandMap& commands, RequestClass* request_class_inst)
: acceptor_(io_context, {{}, port})
, commands_(commands)
, request_class_inst_(request_class_inst)
{
DoAccept();
}
~Server() { Kill(); }
void Kill() { continue_ = false; }
private:
tcp::acceptor acceptor_;
bool continue_ = true;
CommandMap commands_;
RequestClass *request_class_inst_;
void DoAccept()
{
acceptor_.async_accept(
[this](error_code ec, tcp::socket socket) {
if (!ec)
std::make_shared<
RequestContext<RequestHandler, RequestClass>>(
std::move(socket), commands_, request_class_inst_)
->Run();
DoAccept();
});
}
};
void RunServer(uint16_t port)
{
boost::asio::io_context io_context;
Server<RequestClassMethod, Sample> s(
io_context, port,
{{"ADD_2", std::mem_fn(&Sample::add_n)},
{"SUB_2", std::mem_fn(&Sample::sub_n)},
{"MUL_2", std::mem_fn(&Sample::mul_n)},
{"DIV_2", std::mem_fn(&Sample::div_n)}},
new Sample{2});
io_context.run();
}
void RunServerInBackground(uint16_t port)
{
std::thread t([port] { RunServer(port); });
t.detach();
}
int main() try {
RunServerInBackground(5000);
::sleep(1); // avoid startup race
boost::asio::io_context io;
tcp::socket s(io);
s.connect({{}, 5000});
std::string const request = R"({"COMMAND": "MUL_2", "VALUE" : 21})";
std::cout << "Request: " << std::quoted(request, '\'') << std::endl;
boost::asio::write(s, boost::asio::buffer(request));
s.shutdown(tcp::socket::shutdown_send); // avoid framing problems
error_code ec;
char reply[2048];
size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply), ec);
std::cout << "Reply is: "
<< std::quoted(std::string_view(reply, reply_length), '\'')
<< " (" << ec.message() << ")" << std::endl;
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
版画
Request: '{"COMMAND": "MUL_2", "VALUE" : 21}'
Reply is: '{"VALUE":42,"SUCCESS":true}' (End of file)
我目前正在尝试设计一个相当简单的 boost::asio
服务器。我的第一个单元测试相当简单:发送一个 JSON 请求 {"COMMAND": "ADD_1", "VALUE" : 1}
并收到以下响应:
{
"SUCCESS" : true,
"VALUE" : 2
}
然而,从套接字 read
之后,回复被截断了一个字符:
Reply is: {
"SUCCESS" : true,
"VALUE" : 2
Process finished with exit code 0
写入套接字的代码相当简单,一个成员函数class RequestContext
:
void RequestContext::DoWrite(std::size_t length)
{
JSONCPP_STRING parse_err;
Json::Value json_req, json_resp;
auto self(this->shared_from_this());
std::string client_req_str(data_);
if (reader_->parse(client_req_str.c_str(),
client_req_str.c_str() +
client_req_str.length(),
&json_req, &parse_err))
{
try {
// Get JSON response.
json_resp = ProcessRequest(json_req);
json_resp["SUCCESS"] = true;
} catch (const std::exception &ex) {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(ex.what());
}
} else {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(parse_err);
}
std::string resp = Json::writeString(writer_, json_resp);
boost::asio::async_write(socket_,
boost::asio::buffer(&resp[0], resp.size()),
[this, self]
(boost::system::error_code ec,
std::size_t bytes_xfered) {
if (!ec) DoRead();
});
}
我已经验证 ProcessRequest
returns 是正确的值,所以问题显然出在 async_write
上。我试过将第二个参数的值增加到 async_write
,但似乎没有效果。我做错了什么?
可以在下面找到一个最小的可重现示例:
#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <json/json.h>
using boost::asio::ip::tcp;
using boost::system::error_code;
/// NOTE: This class exists exclusively for unit testing.
class RequestClass {
public:
/**
* Initialize class with value n to add sub from input values.
*
* @param n Value to add/sub from input values.
*/
explicit RequestClass(int n) : n_(n) {}
/// Value to add/sub from
int n_;
/**
* Add n to value in JSON request.
*
* @param request JSON request with field "value".
* @return JSON response containing modified field "value" = [original_value] + n.
*/
[[nodiscard]] Json::Value add_n(const Json::Value &request) const
{
Json::Value resp;
resp["SUCCESS"] = true;
// If value is present in request, return value + 1, else return error.
if (request.get("VALUE", NULL) != NULL) {
resp["VALUE"] = request["VALUE"].asInt() + this->n_;
} else {
resp["SUCCESS"] = false;
resp["ERRORS"] = "Invalid value.";
}
return resp;
}
/**
* Sun n from value in JSON request.
*
* @param request JSON request with field "value".
* @return JSON response containing modified field "value" = [original_value] - n.
*/
[[nodiscard]] Json::Value sub_n(const Json::Value &request) const
{
Json::Value resp, value;
resp["SUCCESS"] = true;
// If value is present in request, return value + 1, else return error.
if (request.get("VALUE", NULL) != NULL) {
resp["VALUE"] = request["VALUE"].asInt() - this->n_;
} else {
resp["SUCCESS"] = false;
resp["ERRORS"] = "Invalid value.";
}
return resp;
}
};
typedef std::function<Json::Value(RequestClass, const Json::Value &)> RequestClassMethod;
template<class RequestHandler, class RequestClass>
class RequestContext :
public std::enable_shared_from_this<RequestContext<RequestHandler,
RequestClass>>
{
public:
typedef std::map<std::string, RequestHandler> CommandMap;
RequestContext(tcp::socket socket, CommandMap commands,
RequestClass *request_class_inst)
: socket_(std::move(socket))
, commands_(std::move(commands))
, request_class_inst_(request_class_inst)
, reader_((new Json::CharReaderBuilder)->newCharReader())
{}
void Run()
{
DoRead();
}
void Kill()
{
continue_ = false;
}
private:
tcp::socket socket_;
RequestClass *request_class_inst_;
CommandMap commands_;
/// Reads JSON.
const std::unique_ptr<Json::CharReader> reader_;
/// Writes JSON.
Json::StreamWriterBuilder writer_;
bool continue_ = true;
char data_[2048];
void DoRead()
{
auto self(this->shared_from_this());
socket_.async_read_some(boost::asio::buffer(data_, 2048),
[this, self](error_code ec, std::size_t length)
{
if (!ec)
{
DoWrite(length);
}
});
}
void DoWrite(std::size_t length)
{
JSONCPP_STRING parse_err;
Json::Value json_req, json_resp;
auto self(this->shared_from_this());
std::string client_req_str(data_);
if (reader_->parse(client_req_str.c_str(),
client_req_str.c_str() +
client_req_str.length(),
&json_req, &parse_err))
{
try {
// Get JSON response.
json_resp = ProcessRequest(json_req);
json_resp["SUCCESS"] = true;
} catch (const std::exception &ex) {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(ex.what());
}
} else {
// If json parsing failed.
json_resp["SUCCESS"] = false;
json_resp["ERRORS"] = std::string(parse_err);
}
std::string resp = Json::writeString(writer_, json_resp);
boost::asio::async_write(socket_,
boost::asio::buffer(&resp[0], resp.size()),
[this, self]
(boost::system::error_code ec,
std::size_t bytes_xfered) {
if (!ec) DoRead();
});
}
Json::Value ProcessRequest(Json::Value request)
{
Json::Value response;
std::string command = request["COMMAND"].asString();
// If command is not valid, give a response with an error.
if(commands_.find(command) == commands_.end()) {
response["SUCCESS"] = false;
response["ERRORS"] = "Invalid command.";
}
// Otherwise, run the relevant handler.
else {
RequestHandler handler = commands_.at(command);
response = handler(*request_class_inst_, request);
}
return response;
}
};
template<class RequestHandler, class RequestClass>
class Server {
public:
typedef std::map<std::string, RequestHandler> CommandMap;
Server(boost::asio::io_context &io_context, short port,
const CommandMap &commands,
RequestClass *request_class_inst)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
, commands_(commands)
, request_class_inst_(request_class_inst)
{
DoAccept();
}
~Server()
{
Kill();
}
void Kill()
{
continue_ = false;
}
private:
tcp::acceptor acceptor_;
bool continue_ = true;
CommandMap commands_;
RequestClass *request_class_inst_;
void DoAccept()
{
acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket) {
if (!ec)
std::make_shared<RequestContext<RequestHandler, RequestClass>>
(std::move(socket), commands_, request_class_inst_)->Run();
DoAccept();
});
}
};
void RunServer(short port)
{
boost::asio::io_context io_context;
auto *request_inst = new RequestClass(1);
std::map<std::string, RequestClassMethod> commands {
{"ADD_1", std::mem_fn(&RequestClass::add_n)},
{"SUB_1", std::mem_fn(&RequestClass::sub_n)}
};
Server<RequestClassMethod, RequestClass> s(io_context, port, commands,
request_inst);
io_context.run();
}
void RunServerInBackground(short port)
{
std::thread t([port] { RunServer(port); });
t.detach();
}
int main()
{
try
{
RunServerInBackground(5000);
boost::asio::io_context io_context;
tcp::socket s(io_context);
tcp::resolver resolver(io_context);
boost::asio::connect(s, resolver.resolve("127.0.0.1", "5000"));
char request[2048] = R"({"COMMAND": "ADD_1", "VALUE" : 1})";
size_t request_length = std::strlen(request);
boost::asio::write(s, boost::asio::buffer(request, request_length));
char reply[2048];
size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply, request_length));
std::cout << "Reply is: ";
std::cout << reply << std::endl;
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
传出缓冲区需要是class成员,就像data_
一样,这样才能保证生命周期直到async_write
完成。
您还可以通过 linter/runtime 检查 ASAN/UBSAN 或 Valgrind 来发现此类问题。
更新
还有
size_t reply_length =
boost::asio::read(s, boost::asio::buffer(reply, request_length));
错误地使用了 request_length
。通常,请避免在任何时候手动指定缓冲区大小。
此外,您的协议不提供框架,因此您实际上无法为更新的请求保持相同的连接打开(您不知道要完成响应需要多少字节)。我将在第一次请求后通过关闭连接来“修复”它,这样我们就有了一个工作演示。
还有一个带有 continue_
标志的竞争条件,但我会把它留作 reader.
当然,考虑不泄露请求class实例。
哦,我也切换到 Boost JSON,因为它看起来更适合:
#include <boost/asio.hpp>
#include <boost/json.hpp>
#include <boost/json/src.hpp>
#include <iostream>
using boost::asio::ip::tcp;
using boost::system::error_code;
namespace json = boost::json;
using Value = json::object;
/// NOTE: This class exists exclusively for unit testing.
struct Sample {
int n_;
Value add_n(Value const& request) const { return impl(std::plus<>{}, request); }
Value sub_n(Value const& request) const { return impl(std::minus<>{}, request); }
Value mul_n(Value const& request) const { return impl(std::multiplies<>{}, request); }
Value div_n(Value const& request) const { return impl(std::divides<>{}, request); }
private:
template <typename Op> Value impl(Op op, Value const& req) const {
return (req.contains("VALUE"))
? Value{{"VALUE", op(req.at("VALUE").as_int64(), n_)},
{"SUCCESS", true}}
: Value{{"ERRORS", "Invalid value."}, {"SUCCESS", false}};
}
};
using RequestClassMethod =
std::function<Value(Sample, Value const&)>;
template <class RequestHandler, class RequestClass>
class RequestContext
: public std::enable_shared_from_this<
RequestContext<RequestHandler, RequestClass>> {
public:
using CommandMap = std::map<std::string, RequestHandler>;
RequestContext(tcp::socket socket, CommandMap commands,
RequestClass* request_class_inst)
: socket_(std::move(socket))
, commands_(std::move(commands))
, request_class_inst_(request_class_inst)
{}
void Run() { DoRead(); }
void Kill() { continue_ = false; }
private:
tcp::socket socket_;
CommandMap commands_;
RequestClass* request_class_inst_;
bool continue_ = true;
char data_[2048];
std::string resp_;
void DoRead()
{
socket_.async_read_some(
boost::asio::buffer(data_),
[this, self = this->shared_from_this()](error_code ec, std::size_t length) {
if (!ec) {
DoWrite(length);
}
});
}
void DoWrite(std::size_t length)
{
Value json_resp;
try {
auto json_req = json::parse({data_, length}).as_object();
json_resp = ProcessRequest(json_req);
json_resp["SUCCESS"] = true;
} catch (std::exception const& ex) {
json_resp = {{"SUCCESS", false}, {"ERRORS", ex.what()}};
}
resp_ = json::serialize(json_resp);
boost::asio::async_write(socket_, boost::asio::buffer(resp_),
[this, self = this->shared_from_this()](
error_code ec, size_t bytes_xfered) {
if (!ec)
DoRead();
});
}
Value ProcessRequest(Value request)
{
auto command = request.contains("COMMAND")
? request["COMMAND"].as_string() //
: "";
std::string cmdstr(command.data(), command.size());
// If command is not valid, give a response with an error.
return commands_.contains(cmdstr) && request_class_inst_
? commands_.at(cmdstr)(*request_class_inst_, request)
: Value{{"SUCCESS", false}, {"ERRORS","Invalid command."}};
}
};
template<class RequestHandler, class RequestClass>
class Server {
public:
using CommandMap = std::map<std::string, RequestHandler>;
Server(boost::asio::io_context& io_context, uint16_t port,
const CommandMap& commands, RequestClass* request_class_inst)
: acceptor_(io_context, {{}, port})
, commands_(commands)
, request_class_inst_(request_class_inst)
{
DoAccept();
}
~Server() { Kill(); }
void Kill() { continue_ = false; }
private:
tcp::acceptor acceptor_;
bool continue_ = true;
CommandMap commands_;
RequestClass *request_class_inst_;
void DoAccept()
{
acceptor_.async_accept(
[this](error_code ec, tcp::socket socket) {
if (!ec)
std::make_shared<
RequestContext<RequestHandler, RequestClass>>(
std::move(socket), commands_, request_class_inst_)
->Run();
DoAccept();
});
}
};
void RunServer(uint16_t port)
{
boost::asio::io_context io_context;
Server<RequestClassMethod, Sample> s(
io_context, port,
{{"ADD_2", std::mem_fn(&Sample::add_n)},
{"SUB_2", std::mem_fn(&Sample::sub_n)},
{"MUL_2", std::mem_fn(&Sample::mul_n)},
{"DIV_2", std::mem_fn(&Sample::div_n)}},
new Sample{2});
io_context.run();
}
void RunServerInBackground(uint16_t port)
{
std::thread t([port] { RunServer(port); });
t.detach();
}
int main() try {
RunServerInBackground(5000);
::sleep(1); // avoid startup race
boost::asio::io_context io;
tcp::socket s(io);
s.connect({{}, 5000});
std::string const request = R"({"COMMAND": "MUL_2", "VALUE" : 21})";
std::cout << "Request: " << std::quoted(request, '\'') << std::endl;
boost::asio::write(s, boost::asio::buffer(request));
s.shutdown(tcp::socket::shutdown_send); // avoid framing problems
error_code ec;
char reply[2048];
size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply), ec);
std::cout << "Reply is: "
<< std::quoted(std::string_view(reply, reply_length), '\'')
<< " (" << ec.message() << ")" << std::endl;
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
版画
Request: '{"COMMAND": "MUL_2", "VALUE" : 21}'
Reply is: '{"VALUE":42,"SUCCESS":true}' (End of file)