以二进制方式 boost::asio::streambuf 的顺序通信失败?
Sequential communication failed with boost::asio::streambuf in binary way?
使用 boost::asio
,我正在编写网络内容。
我试图构建一个简单的发送和接收字符串协议。
发送方首先将字符串大小发送给接收方。然后发送方将实际的字符串发送给接收方。
我特别设计了以下两个协议。
持有字符串的发送方将其发送给接收方。收到后,接收方显示字符串。
按顺序执行上述协议(两次)。
我构建了上面的协议,如下所示:
如果我执行一次这个协议,就可以正常工作。
但是,如果我多次执行此协议(例如两次),则
接收方收到的字符串大小错误。
第一次:1365 字节。
第二次:779073 字节。 (只读不是 779073 而是 7790)
我发现os << data_size
不是二进制的。 “779073”仅作为 6 字节字符串发送。但是接收方只读取了其中的 4 个字节。
如何使用 boost::asio 和 boost::asio::streambuf 发送二进制数据和接收二进制数据?
接收器
// socket is already defined
// ** first step: recv data size
boost::asio::streambuf buf;
boost::asio::read(
socket,
buf,
boost::asio::transfer_exactly(sizeof(uint32_t))
);
std::istream iss(&buf);
uint32_t read_len;
iss >> read_len;
// ** second step: recv payload based on the data size
boost::asio::streambuf buf2;
read_len = boost::asio::read(socket, buf2,
boost::asio::transfer_exactly(read_len), error);
cout << " read "<< read_len << " bytes payload" << endl;
std::istream is_payload(&buf2);
std::string str;
is_payload >> str;
cout << str << endl;
发件人
// socket is already defined
string str=...; // some string to be sent
// ** first step: tell the string size to the reciever
uint32_t data_size = str.size();
boost::asio::streambuf send_buf;
std::ostream os(&send_buf);
os << data_size;
size_t sent_byte = boost::asio::write(socket, send_buf.data());
cout << sent_byte << endl; // debug purpose
// ** second step: send the actual string (payload)
sent_byte = boost::asio::write(socket, boost::asio::buffer(reinterpret_cast<const char*>(&str[0]), data_size));
cout << sent_byte << endl; // debug purpose
您可以发送二进制大小,但这需要您考虑设备和操作系统之间的架构差异¹。
以下是我对可重用协议实际编码的看法:
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
namespace ba = boost::asio;
using ba::ip::tcp;
using error_code = boost::system::error_code;
namespace Protocol { // your library
using net_size_t = boost::endian::big_int32_t; // This protocol uses Big-endian network byte order
template <typename Derived, typename Token, typename Sig = void(error_code, size_t)>
struct base_async_op : std::enable_shared_from_this<Derived> {
using base_type = base_async_op<Derived, Token, Sig>;
template <typename DeducedToken>
base_async_op(DeducedToken &&token) : _token(std::forward<DeducedToken>(token)) {}
using _Token = std::decay_t<Token>;
using _Init = ba::async_completion<_Token, Sig>;
using _Handler = typename _Init::completion_handler_type;
_Token _token;
_Init _init {_token};
auto get_allocator() const noexcept {
return (boost::asio::get_associated_allocator)(_init.completion_handler);
}
using executor_type = ba::associated_executor_t<_Handler>;
executor_type get_executor() const noexcept {
return (boost::asio::get_associated_executor)(_init.completion_handler);
}
Derived& derived() { return static_cast<Derived&>(*this); }
Derived const& derived() const { return static_cast<Derived const&>(*this); }
template <typename F>
auto wrap(F&& f) const {
//std::cout << "WRAP: " << typeid(derived().get_executor()).name() << "\n";
return ba::bind_executor(derived().get_executor(), std::forward<F>(f));
}
};
template <typename Derived, typename Stream, typename Token, typename Sig = void(error_code, size_t)>
struct stream_async_op : base_async_op<Derived, Token, Sig> {
using base_type = stream_async_op<Derived, Stream, Token, Sig>;
template <typename DeducedToken>
stream_async_op(Stream& s, DeducedToken &&token) : base_async_op<Derived, Token, Sig>(std::forward<DeducedToken>(token)), _stream(s) {}
Stream& _stream;
using executor_type = ba::associated_executor_t<typename stream_async_op::_Handler, decltype(std::declval<Stream>().get_executor())>;
executor_type get_executor() const noexcept {
return (boost::asio::get_associated_executor)(this->_init.completion_handler, _stream.get_executor());
}
};
template <typename AsyncStream, typename Buffer, typename Token>
auto async_transmit(AsyncStream& s, Buffer message_buffer, Token&& token) {
struct op : stream_async_op<op, AsyncStream, Token> {
using op::base_type::base_type;
using op::base_type::_init;
using op::base_type::_stream;
net_size_t _length[1];
auto run(Buffer buffer) {
auto self = this->shared_from_this();
_length[0] = ba::buffer_size(buffer);
ba::async_write(_stream, std::vector<ba::const_buffer> { ba::buffer(_length), buffer },
this->wrap([self,this](error_code ec, size_t transferred) { _init.completion_handler(ec, transferred); }));
return _init.result.get();
}
};
return std::make_shared<op>(s, std::forward<Token>(token))->run(message_buffer);
}
template <typename AsyncStream, typename Buffer, typename Token>
auto async_receive(AsyncStream& s, Buffer& output, Token&& token) {
struct op : stream_async_op<op, AsyncStream, Token> {
using op::base_type::base_type;
using op::base_type::_init;
using op::base_type::_stream;
net_size_t _length[1] = {0};
auto run(Buffer& output) {
auto self = this->shared_from_this();
ba::async_read(_stream, ba::buffer(_length), this->wrap([self, this, &output](error_code ec, size_t transferred) {
if (ec)
_init.completion_handler(ec, transferred);
else
ba::async_read(_stream, ba::dynamic_buffer(output), ba::transfer_exactly(_length[0]),
this->wrap([self, this](error_code ec, size_t transferred) {
_init.completion_handler(ec, transferred);
}));
}));
return _init.result.get();
}
};
return std::make_shared<op>(s, std::forward<Token>(token))->run(output);
}
template <typename Output = std::string, typename AsyncStream, typename Token>
auto async_receive(AsyncStream& s, Token&& token) {
struct op : stream_async_op<op, AsyncStream, Token, void(error_code, Output)> {
using op::base_type::base_type;
using op::base_type::_init;
using op::base_type::_stream;
Output _output;
net_size_t _length[1] = {0};
auto run() {
auto self = this->shared_from_this();
ba::async_read(_stream, ba::buffer(_length), [self,this](error_code ec, size_t) {
if (ec)
_init.completion_handler(ec, std::move(_output));
else
ba::async_read(_stream, ba::dynamic_buffer(_output), ba::transfer_exactly(_length[0]),
[self,this](error_code ec, size_t) { _init.completion_handler(ec, std::move(_output)); });
});
return _init.result.get();
}
};
return std::make_shared<op>(s, std::forward<Token>(token))->run();
}
} // Protocol
#include <iostream>
#include <iomanip>
int main() {
ba::io_context io;
tcp::socket sock(io);
sock.connect({tcp::v4(), 6767});
auto cont = [](auto name, auto continuation = []{}) { return [=](error_code ec, size_t transferred) {
std::cout << name << " completed (" << transferred << ", " << ec.message() << ")\n";
if (!ec) continuation();
}; };
auto report = [=](auto name) { return cont(name, []{}); };
// send chain
std::string hello = "Hello", world = "World";
Protocol::async_transmit(sock, ba::buffer(hello),
cont("Send hello", [&] { Protocol::async_transmit(sock, ba::buffer(world), report("Send world")); }
));
#ifndef DEMO_USE_FUTURE
// receive chain
std::string object1, object2;
Protocol::async_receive(sock, object1,
cont("Read object 1", [&] { Protocol::async_receive(sock, object2, report("Read object 2")); }));
io.run();
std::cout << "Response object 1: " << std::quoted(object1) << "\n";
std::cout << "Response object 2: " << std::quoted(object2) << "\n";
#else
// also possible, alternative completion mechanisms:
std::future<std::string> fut = Protocol::async_receive(sock, ba::use_future);
io.run();
std::cout << "Response object: " << std::quoted(fut.get()) << "\n";
#endif
}
当与测试服务器对话时:
xxd -p -r <<< '0000 0006 4e6f 2077 6179 0000 0005 4a6f 73c3 a90a' | netcat -l -p 6767 | xxd
程序打印
Send hello completed (9, Success)
Send world completed (9, Success)
Read object 1 completed (6, Success)
Read object 2 completed (5, Success)
Response object 1: "No way"
Response object 2: "José"
并且 netcat 端打印:
00000000: 0000 0005 4865 6c6c 6f00 0000 0557 6f72 ....Hello....Wor
00000010: 6c64 ld
启用处理程序跟踪允许您使用 handlerviz.pl
可视化调用链:
Note You can change big_int32_t
to little_int32_t
without any further change. Of course, you should change the payload on the server side to match:
xxd -p -r <<< '0600 0000 4e6f 2077 6179 0500 0000 4a6f 73c3 a90a' | netcat -l -p 6767 | xxd
¹ 字节顺序,例如使用 Boost Endian 或 ::ntohs
、::ntohl
、::htons
和 ::htonl
使用 boost::asio
,我正在编写网络内容。
我试图构建一个简单的发送和接收字符串协议。
发送方首先将字符串大小发送给接收方。然后发送方将实际的字符串发送给接收方。
我特别设计了以下两个协议。
持有字符串的发送方将其发送给接收方。收到后,接收方显示字符串。
按顺序执行上述协议(两次)。
我构建了上面的协议,如下所示: 如果我执行一次这个协议,就可以正常工作。 但是,如果我多次执行此协议(例如两次),则 接收方收到的字符串大小错误。
第一次:1365 字节。
第二次:779073 字节。 (只读不是 779073 而是 7790)
我发现os << data_size
不是二进制的。 “779073”仅作为 6 字节字符串发送。但是接收方只读取了其中的 4 个字节。
如何使用 boost::asio 和 boost::asio::streambuf 发送二进制数据和接收二进制数据?
接收器
// socket is already defined
// ** first step: recv data size
boost::asio::streambuf buf;
boost::asio::read(
socket,
buf,
boost::asio::transfer_exactly(sizeof(uint32_t))
);
std::istream iss(&buf);
uint32_t read_len;
iss >> read_len;
// ** second step: recv payload based on the data size
boost::asio::streambuf buf2;
read_len = boost::asio::read(socket, buf2,
boost::asio::transfer_exactly(read_len), error);
cout << " read "<< read_len << " bytes payload" << endl;
std::istream is_payload(&buf2);
std::string str;
is_payload >> str;
cout << str << endl;
发件人
// socket is already defined
string str=...; // some string to be sent
// ** first step: tell the string size to the reciever
uint32_t data_size = str.size();
boost::asio::streambuf send_buf;
std::ostream os(&send_buf);
os << data_size;
size_t sent_byte = boost::asio::write(socket, send_buf.data());
cout << sent_byte << endl; // debug purpose
// ** second step: send the actual string (payload)
sent_byte = boost::asio::write(socket, boost::asio::buffer(reinterpret_cast<const char*>(&str[0]), data_size));
cout << sent_byte << endl; // debug purpose
您可以发送二进制大小,但这需要您考虑设备和操作系统之间的架构差异¹。
以下是我对可重用协议实际编码的看法:
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
namespace ba = boost::asio;
using ba::ip::tcp;
using error_code = boost::system::error_code;
namespace Protocol { // your library
using net_size_t = boost::endian::big_int32_t; // This protocol uses Big-endian network byte order
template <typename Derived, typename Token, typename Sig = void(error_code, size_t)>
struct base_async_op : std::enable_shared_from_this<Derived> {
using base_type = base_async_op<Derived, Token, Sig>;
template <typename DeducedToken>
base_async_op(DeducedToken &&token) : _token(std::forward<DeducedToken>(token)) {}
using _Token = std::decay_t<Token>;
using _Init = ba::async_completion<_Token, Sig>;
using _Handler = typename _Init::completion_handler_type;
_Token _token;
_Init _init {_token};
auto get_allocator() const noexcept {
return (boost::asio::get_associated_allocator)(_init.completion_handler);
}
using executor_type = ba::associated_executor_t<_Handler>;
executor_type get_executor() const noexcept {
return (boost::asio::get_associated_executor)(_init.completion_handler);
}
Derived& derived() { return static_cast<Derived&>(*this); }
Derived const& derived() const { return static_cast<Derived const&>(*this); }
template <typename F>
auto wrap(F&& f) const {
//std::cout << "WRAP: " << typeid(derived().get_executor()).name() << "\n";
return ba::bind_executor(derived().get_executor(), std::forward<F>(f));
}
};
template <typename Derived, typename Stream, typename Token, typename Sig = void(error_code, size_t)>
struct stream_async_op : base_async_op<Derived, Token, Sig> {
using base_type = stream_async_op<Derived, Stream, Token, Sig>;
template <typename DeducedToken>
stream_async_op(Stream& s, DeducedToken &&token) : base_async_op<Derived, Token, Sig>(std::forward<DeducedToken>(token)), _stream(s) {}
Stream& _stream;
using executor_type = ba::associated_executor_t<typename stream_async_op::_Handler, decltype(std::declval<Stream>().get_executor())>;
executor_type get_executor() const noexcept {
return (boost::asio::get_associated_executor)(this->_init.completion_handler, _stream.get_executor());
}
};
template <typename AsyncStream, typename Buffer, typename Token>
auto async_transmit(AsyncStream& s, Buffer message_buffer, Token&& token) {
struct op : stream_async_op<op, AsyncStream, Token> {
using op::base_type::base_type;
using op::base_type::_init;
using op::base_type::_stream;
net_size_t _length[1];
auto run(Buffer buffer) {
auto self = this->shared_from_this();
_length[0] = ba::buffer_size(buffer);
ba::async_write(_stream, std::vector<ba::const_buffer> { ba::buffer(_length), buffer },
this->wrap([self,this](error_code ec, size_t transferred) { _init.completion_handler(ec, transferred); }));
return _init.result.get();
}
};
return std::make_shared<op>(s, std::forward<Token>(token))->run(message_buffer);
}
template <typename AsyncStream, typename Buffer, typename Token>
auto async_receive(AsyncStream& s, Buffer& output, Token&& token) {
struct op : stream_async_op<op, AsyncStream, Token> {
using op::base_type::base_type;
using op::base_type::_init;
using op::base_type::_stream;
net_size_t _length[1] = {0};
auto run(Buffer& output) {
auto self = this->shared_from_this();
ba::async_read(_stream, ba::buffer(_length), this->wrap([self, this, &output](error_code ec, size_t transferred) {
if (ec)
_init.completion_handler(ec, transferred);
else
ba::async_read(_stream, ba::dynamic_buffer(output), ba::transfer_exactly(_length[0]),
this->wrap([self, this](error_code ec, size_t transferred) {
_init.completion_handler(ec, transferred);
}));
}));
return _init.result.get();
}
};
return std::make_shared<op>(s, std::forward<Token>(token))->run(output);
}
template <typename Output = std::string, typename AsyncStream, typename Token>
auto async_receive(AsyncStream& s, Token&& token) {
struct op : stream_async_op<op, AsyncStream, Token, void(error_code, Output)> {
using op::base_type::base_type;
using op::base_type::_init;
using op::base_type::_stream;
Output _output;
net_size_t _length[1] = {0};
auto run() {
auto self = this->shared_from_this();
ba::async_read(_stream, ba::buffer(_length), [self,this](error_code ec, size_t) {
if (ec)
_init.completion_handler(ec, std::move(_output));
else
ba::async_read(_stream, ba::dynamic_buffer(_output), ba::transfer_exactly(_length[0]),
[self,this](error_code ec, size_t) { _init.completion_handler(ec, std::move(_output)); });
});
return _init.result.get();
}
};
return std::make_shared<op>(s, std::forward<Token>(token))->run();
}
} // Protocol
#include <iostream>
#include <iomanip>
int main() {
ba::io_context io;
tcp::socket sock(io);
sock.connect({tcp::v4(), 6767});
auto cont = [](auto name, auto continuation = []{}) { return [=](error_code ec, size_t transferred) {
std::cout << name << " completed (" << transferred << ", " << ec.message() << ")\n";
if (!ec) continuation();
}; };
auto report = [=](auto name) { return cont(name, []{}); };
// send chain
std::string hello = "Hello", world = "World";
Protocol::async_transmit(sock, ba::buffer(hello),
cont("Send hello", [&] { Protocol::async_transmit(sock, ba::buffer(world), report("Send world")); }
));
#ifndef DEMO_USE_FUTURE
// receive chain
std::string object1, object2;
Protocol::async_receive(sock, object1,
cont("Read object 1", [&] { Protocol::async_receive(sock, object2, report("Read object 2")); }));
io.run();
std::cout << "Response object 1: " << std::quoted(object1) << "\n";
std::cout << "Response object 2: " << std::quoted(object2) << "\n";
#else
// also possible, alternative completion mechanisms:
std::future<std::string> fut = Protocol::async_receive(sock, ba::use_future);
io.run();
std::cout << "Response object: " << std::quoted(fut.get()) << "\n";
#endif
}
当与测试服务器对话时:
xxd -p -r <<< '0000 0006 4e6f 2077 6179 0000 0005 4a6f 73c3 a90a' | netcat -l -p 6767 | xxd
程序打印
Send hello completed (9, Success)
Send world completed (9, Success)
Read object 1 completed (6, Success)
Read object 2 completed (5, Success)
Response object 1: "No way"
Response object 2: "José"
并且 netcat 端打印:
00000000: 0000 0005 4865 6c6c 6f00 0000 0557 6f72 ....Hello....Wor
00000010: 6c64 ld
启用处理程序跟踪允许您使用 handlerviz.pl
可视化调用链:
Note You can change
big_int32_t
tolittle_int32_t
without any further change. Of course, you should change the payload on the server side to match:xxd -p -r <<< '0600 0000 4e6f 2077 6179 0500 0000 4a6f 73c3 a90a' | netcat -l -p 6767 | xxd
¹ 字节顺序,例如使用 Boost Endian 或 ::ntohs
、::ntohl
、::htons
和 ::htonl