以二进制方式 boost::asio::streambuf 的顺序通信失败?

Sequential communication failed with boost::asio::streambuf in binary way?

使用 boost::asio,我正在编写网络内容。 我试图构建一个简单的发送和接收字符串协议。 发送方首先将字符串大小发送给接收方。然后发送方将实际的字符串发送给接收方。

我特别设计了以下两个协议。

我构建了上面的协议,如下所示: 如果我执行一次这个协议,就可以正常工作。 但是,如果我多次执行此协议(例如两次),则 接收方收到的字符串大小错误。

第一次:1365 字节。

第二次:779073 字节。 (只读不是 779073 而是 7790)

我发现os << data_size不是二进制的。 “779073”仅作为 6 字节字符串发送。但是接收方只读取了其中的 4 个字节。 如何使用 boost::asio 和 boost::asio::streambuf 发送二进制数据和接收二进制数据?

接收器

// socket is already defined
// ** first step: recv data size
boost::asio::streambuf buf;
boost::asio::read(
   socket,
   buf, 
   boost::asio::transfer_exactly(sizeof(uint32_t))
);
std::istream iss(&buf);
uint32_t read_len;
iss >>  read_len;

// ** second step: recv payload based on the data size
boost::asio::streambuf buf2;
read_len = boost::asio::read(socket, buf2, 
boost::asio::transfer_exactly(read_len), error);
cout << "  read "<< read_len << " bytes payload" << endl; 
std::istream is_payload(&buf2);
std::string str;
is_payload >> str;
cout << str << endl; 

发件人

// socket is already defined
string str=...;   // some string to be sent
// ** first step: tell the string size to the reciever
uint32_t data_size = str.size();
boost::asio::streambuf send_buf;
std::ostream os(&send_buf);
os << data_size;
size_t sent_byte = boost::asio::write(socket, send_buf.data());
cout << sent_byte << endl; // debug purpose

// ** second step: send the actual string (payload)
sent_byte = boost::asio::write(socket, boost::asio::buffer(reinterpret_cast<const char*>(&str[0]), data_size));
cout << sent_byte << endl; // debug purpose

可以发送二进制大小,但这需要您考虑设备和操作系统之间的架构差异¹。

以下是我对可重用协议实际编码的看法:

//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>

namespace ba = boost::asio;
using ba::ip::tcp;
using error_code = boost::system::error_code;

namespace Protocol { // your library

    using net_size_t = boost::endian::big_int32_t; // This protocol uses Big-endian network byte order

    template <typename Derived, typename Token, typename Sig = void(error_code, size_t)> 
    struct base_async_op : std::enable_shared_from_this<Derived> {
        using base_type = base_async_op<Derived, Token, Sig>;

        template <typename DeducedToken>
        base_async_op(DeducedToken &&token) : _token(std::forward<DeducedToken>(token)) {}

        using _Token   = std::decay_t<Token>;
        using _Init    = ba::async_completion<_Token, Sig>;
        using _Handler = typename _Init::completion_handler_type;

        _Token _token;
        _Init _init {_token};

        auto get_allocator() const noexcept { 
            return (boost::asio::get_associated_allocator)(_init.completion_handler);
        }
        using executor_type = ba::associated_executor_t<_Handler>;
        executor_type get_executor() const noexcept {
            return (boost::asio::get_associated_executor)(_init.completion_handler);
        }

        Derived& derived()             { return static_cast<Derived&>(*this);       } 
        Derived const& derived() const { return static_cast<Derived const&>(*this); } 

        template <typename F>
        auto wrap(F&& f) const {
            //std::cout << "WRAP: " << typeid(derived().get_executor()).name() << "\n";
            return ba::bind_executor(derived().get_executor(), std::forward<F>(f));
        }
    };

    template <typename Derived, typename Stream, typename Token, typename Sig = void(error_code, size_t)> 
    struct stream_async_op : base_async_op<Derived, Token, Sig> {
        using base_type = stream_async_op<Derived, Stream, Token, Sig>;

        template <typename DeducedToken>
        stream_async_op(Stream& s, DeducedToken &&token) : base_async_op<Derived, Token, Sig>(std::forward<DeducedToken>(token)), _stream(s)  {}

        Stream& _stream;

        using executor_type = ba::associated_executor_t<typename stream_async_op::_Handler, decltype(std::declval<Stream>().get_executor())>;
        executor_type get_executor() const noexcept {
            return (boost::asio::get_associated_executor)(this->_init.completion_handler, _stream.get_executor());
        }
    };

    template <typename AsyncStream, typename Buffer, typename Token>
    auto async_transmit(AsyncStream& s, Buffer message_buffer, Token&& token) {

        struct op : stream_async_op<op, AsyncStream, Token> {
            using op::base_type::base_type;
            using op::base_type::_init;
            using op::base_type::_stream;

            net_size_t _length[1];

            auto run(Buffer buffer) {
                auto self = this->shared_from_this();
                _length[0] = ba::buffer_size(buffer);

                ba::async_write(_stream, std::vector<ba::const_buffer> { ba::buffer(_length), buffer },
                    this->wrap([self,this](error_code ec, size_t transferred) { _init.completion_handler(ec, transferred); }));

                return _init.result.get();
            }
        };

        return std::make_shared<op>(s, std::forward<Token>(token))->run(message_buffer);
    }

    template <typename AsyncStream, typename Buffer, typename Token>
    auto async_receive(AsyncStream& s, Buffer& output, Token&& token) {

        struct op : stream_async_op<op, AsyncStream, Token> {
            using op::base_type::base_type;
            using op::base_type::_init;
            using op::base_type::_stream;

            net_size_t _length[1] = {0};

            auto run(Buffer& output) {
                auto self = this->shared_from_this();

                ba::async_read(_stream, ba::buffer(_length), this->wrap([self, this, &output](error_code ec, size_t transferred) {
                    if (ec)
                        _init.completion_handler(ec, transferred);
                    else
                        ba::async_read(_stream, ba::dynamic_buffer(output), ba::transfer_exactly(_length[0]),
                            this->wrap([self, this](error_code ec, size_t transferred) { 
                                _init.completion_handler(ec, transferred);
                            }));
                }));

                return _init.result.get();
            }
        };

        return std::make_shared<op>(s, std::forward<Token>(token))->run(output);
    }

    template <typename Output = std::string, typename AsyncStream, typename Token>
    auto async_receive(AsyncStream& s, Token&& token) {

        struct op : stream_async_op<op, AsyncStream, Token, void(error_code, Output)> {
            using op::base_type::base_type;
            using op::base_type::_init;
            using op::base_type::_stream;

            Output _output;
            net_size_t _length[1] = {0};

            auto run() {
                auto self = this->shared_from_this();

                ba::async_read(_stream, ba::buffer(_length), [self,this](error_code ec, size_t) {
                        if (ec)
                            _init.completion_handler(ec, std::move(_output));
                        else
                            ba::async_read(_stream, ba::dynamic_buffer(_output), ba::transfer_exactly(_length[0]),
                                [self,this](error_code ec, size_t) { _init.completion_handler(ec, std::move(_output)); });
                    });

                return _init.result.get();
            }
        };

        return std::make_shared<op>(s, std::forward<Token>(token))->run();
    }

} // Protocol

#include <iostream>
#include <iomanip>

int main() {

    ba::io_context io;
    tcp::socket sock(io);
    sock.connect({tcp::v4(), 6767});

    auto cont = [](auto name, auto continuation = []{}) { return [=](error_code ec, size_t transferred) {
        std::cout << name << " completed (" << transferred << ", " << ec.message() << ")\n";
        if (!ec) continuation();
    }; };
    auto report = [=](auto name) { return cont(name, []{}); };

    // send chain
    std::string hello = "Hello", world = "World";
    Protocol::async_transmit(sock, ba::buffer(hello),
            cont("Send hello", [&] { Protocol::async_transmit(sock, ba::buffer(world), report("Send world")); }
        ));
#ifndef DEMO_USE_FUTURE
    // receive chain
    std::string object1, object2;
    Protocol::async_receive(sock, object1,
            cont("Read object 1", [&] { Protocol::async_receive(sock, object2, report("Read object 2")); }));

    io.run();

    std::cout << "Response object 1: " << std::quoted(object1) << "\n";
    std::cout << "Response object 2: " << std::quoted(object2) << "\n";
#else
    // also possible, alternative completion mechanisms:
    std::future<std::string> fut = Protocol::async_receive(sock, ba::use_future);
    io.run();

    std::cout << "Response object: " << std::quoted(fut.get()) << "\n";
#endif

}

当与测试服务器对话时:

xxd -p -r <<< '0000 0006 4e6f 2077 6179 0000 0005 4a6f 73c3 a90a' | netcat -l -p 6767 | xxd

程序打印

Send hello completed (9, Success)
Send world completed (9, Success)
Read object 1 completed (6, Success)
Read object 2 completed (5, Success)
Response object 1: "No way"
Response object 2: "José"

并且 netcat 端打印:

00000000: 0000 0005 4865 6c6c 6f00 0000 0557 6f72  ....Hello....Wor
00000010: 6c64                                     ld

启用处理程序跟踪允许您使用 handlerviz.pl 可视化调用链:

Note You can change big_int32_t to little_int32_t without any further change. Of course, you should change the payload on the server side to match:

xxd -p -r <<< '0600 0000 4e6f 2077 6179 0500 0000 4a6f 73c3 a90a' | netcat -l -p 6767 | xxd

¹ 字节顺序,例如使用 Boost Endian::ntohs::ntohl::htons::htonl