Boost::Asio :为什么 async_write 在通过给定套接字发送缓冲区时会截断缓冲区?

Boost::Asio : Why does async_write truncate the buffer when sending it through the given socket?

我目前正在尝试设计一个相当简单的 boost::asio 服务器。我的第一个单元测试相当简单:发送一个 JSON 请求 {"COMMAND": "ADD_1", "VALUE" : 1} 并收到以下响应:

{
    "SUCCESS" : true,
    "VALUE" : 2
}

然而,从套接字 read 之后,回复被截断了一个字符:

Reply is: {
    "SUCCESS" : true,
    "VALUE" : 2

Process finished with exit code 0

写入套接字的代码相当简单,一个成员函数class RequestContext:

    void RequestContext::DoWrite(std::size_t length)
    {
        JSONCPP_STRING parse_err;
        Json::Value json_req, json_resp;
        auto self(this->shared_from_this());
        std::string client_req_str(data_);

        if (reader_->parse(client_req_str.c_str(),
                           client_req_str.c_str() +
                           client_req_str.length(),
                           &json_req, &parse_err))
        {
            try {
                // Get JSON response.
                json_resp = ProcessRequest(json_req);
                json_resp["SUCCESS"] = true;
            } catch (const std::exception &ex) {
                // If json parsing failed.
                json_resp["SUCCESS"] = false;
                json_resp["ERRORS"] = std::string(ex.what());
            }
        } else {
            // If json parsing failed.
            json_resp["SUCCESS"] = false;
            json_resp["ERRORS"] = std::string(parse_err);
        }

        std::string resp = Json::writeString(writer_, json_resp);

        boost::asio::async_write(socket_,
                                 boost::asio::buffer(&resp[0], resp.size()),
                                 [this, self]
                                 (boost::system::error_code ec,
                                  std::size_t bytes_xfered) {
                                    if (!ec)     DoRead();
                                 });
    }

我已经验证 ProcessRequest returns 是正确的值,所以问题显然出在 async_write 上。我试过将第二个参数的值增加到 async_write,但似乎没有效果。我做错了什么?

可以在下面找到一个最小的可重现示例:

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <json/json.h>

using boost::asio::ip::tcp;
using boost::system::error_code;
/// NOTE: This class exists exclusively for unit testing.
class RequestClass {
public:
    /**
     * Initialize class with value n to add sub from input values.
     *
     * @param n Value to add/sub from input values.
     */
    explicit RequestClass(int n) : n_(n) {}

    /// Value to add/sub from
    int n_;

    /**
     * Add n to value in JSON request.
     *
     * @param request JSON request with field "value".
     * @return JSON response containing modified field "value" = [original_value] + n.
     */
    [[nodiscard]] Json::Value add_n(const Json::Value &request) const
    {
        Json::Value resp;
        resp["SUCCESS"] = true;

        // If value is present in request, return value + 1, else return error.
        if (request.get("VALUE", NULL) != NULL) {
            resp["VALUE"] = request["VALUE"].asInt() + this->n_;
        } else {
            resp["SUCCESS"] = false;
            resp["ERRORS"] = "Invalid value.";
        }
        return resp;
    }

    /**
     * Sun n from value in JSON request.
     *
     * @param request JSON request with field "value".
     * @return JSON response containing modified field "value" = [original_value] - n.
     */
    [[nodiscard]] Json::Value sub_n(const Json::Value &request) const
    {
        Json::Value resp, value;
        resp["SUCCESS"] = true;

        // If value is present in request, return value + 1, else return error.
        if (request.get("VALUE", NULL) != NULL) {
            resp["VALUE"] = request["VALUE"].asInt() - this->n_;
        } else {
            resp["SUCCESS"] = false;
            resp["ERRORS"] = "Invalid value.";
        }
        return resp;
    }
};

typedef std::function<Json::Value(RequestClass, const Json::Value &)> RequestClassMethod;

template<class RequestHandler, class RequestClass>
class RequestContext :
    public std::enable_shared_from_this<RequestContext<RequestHandler,
                                                       RequestClass>>
{
public:
    typedef std::map<std::string, RequestHandler> CommandMap;

    RequestContext(tcp::socket socket, CommandMap commands,
                   RequestClass *request_class_inst)
        : socket_(std::move(socket))
        , commands_(std::move(commands))
        , request_class_inst_(request_class_inst)
        , reader_((new Json::CharReaderBuilder)->newCharReader())
    {}

    void Run()
    {
        DoRead();
    }

    void Kill()
    {
        continue_ = false;
    }

private:
    tcp::socket socket_;
    RequestClass *request_class_inst_;
    CommandMap commands_;
    /// Reads JSON.
    const std::unique_ptr<Json::CharReader> reader_;
    /// Writes JSON.
    Json::StreamWriterBuilder writer_;
    bool continue_ = true;
    char data_[2048];

    void DoRead()
    {
        auto self(this->shared_from_this());
        socket_.async_read_some(boost::asio::buffer(data_, 2048),
                                [this, self](error_code ec, std::size_t length)
                                {
                                  if (!ec)
                                  {
                                      DoWrite(length);
                                  }
                                });
    }

    void DoWrite(std::size_t length)
    {
        JSONCPP_STRING parse_err;
        Json::Value json_req, json_resp;
        auto self(this->shared_from_this());
        std::string client_req_str(data_);

        if (reader_->parse(client_req_str.c_str(),
                           client_req_str.c_str() +
                           client_req_str.length(),
                           &json_req, &parse_err))
        {
            try {
                // Get JSON response.
                json_resp = ProcessRequest(json_req);
                json_resp["SUCCESS"] = true;
            } catch (const std::exception &ex) {
                // If json parsing failed.
                json_resp["SUCCESS"] = false;
                json_resp["ERRORS"] = std::string(ex.what());
            }
        } else {
            // If json parsing failed.
            json_resp["SUCCESS"] = false;
            json_resp["ERRORS"] = std::string(parse_err);
        }

        std::string resp = Json::writeString(writer_, json_resp);

        boost::asio::async_write(socket_,
                                 boost::asio::buffer(&resp[0], resp.size()),
                                 [this, self]
                                 (boost::system::error_code ec,
                                  std::size_t bytes_xfered) {
                                    if (!ec)     DoRead();
                                 });
    }

    Json::Value ProcessRequest(Json::Value request)
    {
        Json::Value response;
        std::string command = request["COMMAND"].asString();

        // If command is not valid, give a response with an error.
        if(commands_.find(command) == commands_.end()) {
            response["SUCCESS"] = false;
            response["ERRORS"] = "Invalid command.";
        }
            // Otherwise, run the relevant handler.
        else {
            RequestHandler handler = commands_.at(command);
            response = handler(*request_class_inst_, request);
        }

        return response;
    }

};

template<class RequestHandler, class RequestClass>
class Server {
public:
    typedef std::map<std::string, RequestHandler> CommandMap;

    Server(boost::asio::io_context &io_context, short port,
           const CommandMap &commands,
           RequestClass *request_class_inst)
        : acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
        , commands_(commands)
        , request_class_inst_(request_class_inst)
    {
        DoAccept();
    }

    ~Server()
    {
        Kill();
    }

    void Kill()
    {
        continue_ = false;
    }

private:
    tcp::acceptor acceptor_;
    bool continue_ = true;
    CommandMap commands_;
    RequestClass *request_class_inst_;

    void DoAccept()
    {
        acceptor_.async_accept(
            [this](boost::system::error_code ec, tcp::socket socket) {
                if (!ec)
                    std::make_shared<RequestContext<RequestHandler, RequestClass>>
                        (std::move(socket), commands_, request_class_inst_)->Run();
                DoAccept();
            });
    }
};

void RunServer(short port)
{
    boost::asio::io_context io_context;
    auto *request_inst = new RequestClass(1);
    std::map<std::string, RequestClassMethod> commands {
        {"ADD_1", std::mem_fn(&RequestClass::add_n)},
        {"SUB_1", std::mem_fn(&RequestClass::sub_n)}
    };
    Server<RequestClassMethod, RequestClass> s(io_context, port, commands,
                                               request_inst);

    io_context.run();
}

void RunServerInBackground(short port)
{
    std::thread t([port] { RunServer(port); });
    t.detach();
}



int main()
{
    try
    {
        RunServerInBackground(5000);
        boost::asio::io_context io_context;
        tcp::socket s(io_context);
        tcp::resolver resolver(io_context);

        boost::asio::connect(s, resolver.resolve("127.0.0.1", "5000"));
        char request[2048] = R"({"COMMAND": "ADD_1", "VALUE" : 1})";
        size_t request_length = std::strlen(request);
        boost::asio::write(s, boost::asio::buffer(request, request_length));
        char reply[2048];
        size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply, request_length));
        std::cout << "Reply is: ";
        std::cout << reply << std::endl;
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << "\n";
    }

    return 0;
}

传出缓冲区需要是class成员,就像data_一样,这样才能保证生命周期直到async_write完成。

您还可以通过 linter/runtime 检查 ASAN/UBSAN 或 Valgrind 来发现此类问题。

更新

还有

size_t reply_length =
    boost::asio::read(s, boost::asio::buffer(reply, request_length));

错误地使用了 request_length。通常,请避免在任何时候手动指定缓冲区大小。

此外,您的协议不提供框架,因此您实际上无法为更新的请求保持相同的连接打开(您不知道要完成响应需要多少字节)。我将在第一次请求后通过关闭连接来“修复”它,这样我们就有了一个工作演示。

还有一个带有 continue_ 标志的竞争条件,但我会把它留作 reader.

的驱魔

当然,考虑不泄露请求class实例。

哦,我也切换到 Boost JSON,因为它看起来更适合:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/json.hpp>
#include <boost/json/src.hpp>
#include <iostream>

using boost::asio::ip::tcp;
using boost::system::error_code;
namespace json = boost::json;
using Value    = json::object;

/// NOTE: This class exists exclusively for unit testing.
struct Sample {
    int n_;

    Value add_n(Value const& request) const { return impl(std::plus<>{}, request); }
    Value sub_n(Value const& request) const { return impl(std::minus<>{}, request); }
    Value mul_n(Value const& request) const { return impl(std::multiplies<>{}, request); }
    Value div_n(Value const& request) const { return impl(std::divides<>{}, request); }

  private:
    template <typename Op> Value impl(Op op, Value const& req) const {
        return (req.contains("VALUE"))
            ? Value{{"VALUE", op(req.at("VALUE").as_int64(), n_)},
                    {"SUCCESS", true}}
            : Value{{"ERRORS", "Invalid value."}, {"SUCCESS", false}};
    }
};

using RequestClassMethod =
    std::function<Value(Sample, Value const&)>;

template <class RequestHandler, class RequestClass>
class RequestContext
    : public std::enable_shared_from_this<
          RequestContext<RequestHandler, RequestClass>> {
  public:
    using CommandMap = std::map<std::string, RequestHandler>;

    RequestContext(tcp::socket socket, CommandMap commands,
                   RequestClass* request_class_inst)
        : socket_(std::move(socket))
        , commands_(std::move(commands))
        , request_class_inst_(request_class_inst)
    {}

    void Run()  { DoRead(); }
    void Kill() { continue_ = false; }

  private:
    tcp::socket   socket_;
    CommandMap    commands_;
    RequestClass* request_class_inst_;
    bool          continue_ = true;
    char          data_[2048];
    std::string   resp_;

    void DoRead()
    {
        socket_.async_read_some(
            boost::asio::buffer(data_),
            [this, self = this->shared_from_this()](error_code ec, std::size_t length) {
                if (!ec) {
                    DoWrite(length);
                }
            });
    }

    void DoWrite(std::size_t length)
    {
        Value json_resp;

        try {
            auto json_req = json::parse({data_, length}).as_object();
            json_resp = ProcessRequest(json_req);
            json_resp["SUCCESS"] = true;
        } catch (std::exception const& ex) {
            json_resp = {{"SUCCESS", false}, {"ERRORS", ex.what()}};
        }

        resp_ = json::serialize(json_resp);

        boost::asio::async_write(socket_, boost::asio::buffer(resp_),
             [this, self = this->shared_from_this()](
                 error_code ec, size_t bytes_xfered) {
                 if (!ec)
                     DoRead();
             });
    }

    Value ProcessRequest(Value request)
    {
        auto command = request.contains("COMMAND")
            ? request["COMMAND"].as_string() //
            : "";
        std::string cmdstr(command.data(), command.size());

        // If command is not valid, give a response with an error.
        return commands_.contains(cmdstr) && request_class_inst_
            ? commands_.at(cmdstr)(*request_class_inst_, request)
            : Value{{"SUCCESS", false}, {"ERRORS","Invalid command."}};
    }
};

template<class RequestHandler, class RequestClass>
class Server {
  public:
    using CommandMap = std::map<std::string, RequestHandler>;

    Server(boost::asio::io_context& io_context, uint16_t port,
           const CommandMap& commands, RequestClass* request_class_inst)
        : acceptor_(io_context, {{}, port})
        , commands_(commands)
        , request_class_inst_(request_class_inst)
    {
        DoAccept();
    }

    ~Server() { Kill(); }
    void Kill() { continue_ = false; }

private:
    tcp::acceptor acceptor_;
    bool          continue_ = true;
    CommandMap    commands_;
    RequestClass *request_class_inst_;

    void DoAccept()
    {
        acceptor_.async_accept(
            [this](error_code ec, tcp::socket socket) {
                if (!ec)
                    std::make_shared<
                        RequestContext<RequestHandler, RequestClass>>(
                        std::move(socket), commands_, request_class_inst_)
                        ->Run();
                DoAccept();
            });
    }
};

void RunServer(uint16_t port)
{
    boost::asio::io_context io_context;

    Server<RequestClassMethod, Sample> s(
        io_context, port,
        {{"ADD_2", std::mem_fn(&Sample::add_n)},
         {"SUB_2", std::mem_fn(&Sample::sub_n)},
         {"MUL_2", std::mem_fn(&Sample::mul_n)},
         {"DIV_2", std::mem_fn(&Sample::div_n)}},
        new Sample{2});

    io_context.run();
}

void RunServerInBackground(uint16_t port)
{
    std::thread t([port] { RunServer(port); });
    t.detach();
}

int main() try {
    RunServerInBackground(5000);
    ::sleep(1); // avoid startup race

    boost::asio::io_context io;
    tcp::socket s(io);
    s.connect({{}, 5000});

    std::string const request = R"({"COMMAND": "MUL_2", "VALUE" : 21})";
    std::cout << "Request: " << std::quoted(request, '\'') << std::endl;

    boost::asio::write(s, boost::asio::buffer(request));
    s.shutdown(tcp::socket::shutdown_send); // avoid framing problems

    error_code ec;
    char reply[2048];
    size_t reply_length = boost::asio::read(s, boost::asio::buffer(reply), ec);

    std::cout << "Reply is: "
              << std::quoted(std::string_view(reply, reply_length), '\'')
              << " (" << ec.message() << ")" << std::endl;
} catch (std::exception const& e) {
    std::cerr << "Exception: " << e.what() << "\n";
}

版画

Request: '{"COMMAND": "MUL_2", "VALUE" : 21}'
Reply is: '{"VALUE":42,"SUCCESS":true}' (End of file)