如何使用 Boost ASIO 实现 IPC 协议?
How to implement an IPC protocol using Boost ASIO?
我正在尝试为将使用 Boost ASIO 构建的项目实施一个简单的 IPC 协议。这个想法是通过 IP/TCP 与具有后端的服务器和将使用从服务器接收的数据构建前端的客户端进行通信。整个 session 会是这样的:
- 连接已建立
- 客户端发送一个 2 字节的数据包,其中包含一些信息,服务器将使用这些信息来构建其响应(这存储为结构
propertiesPacket
)
- 服务器处理接收到的数据并将输出存储在名为
processedData
的可变大小结构中
- 服务器发送一个 2 字节无符号整数,指示客户端它将接收的结构的大小(假设该结构的大小为
n
字节)
- 服务器将结构数据作为
n
字节数据包发送
- 连接结束
我尝试自己实现这个,遵循 Boost ASIO 文档中提供的精彩教程,以及库中包含的示例和我在 Github 上找到的一些回购协议,但这是我的第一手资料使用网络和 IPC,我无法让它工作,我的客户端 returns 一个异常,说连接被对等方重置。
我现在有的是:
// File client.cpp
int main(int argc, char *argv[])
{
try {
propertiesPacket properties;
// ...
// We set the data inside the properties struct
// ...
boost::asio::io_context io;
boost::asio::ip::tcp::socket socket(io);
boost::asio::ip::tcp::resolver resolver(io);
boost::asio::connect(socket, resolver.resolve(argv[1], argv[2]));
boost::asio::write(socket, boost::asio::buffer(&properties, sizeof(propertiesPacket)));
unsigned short responseSize {};
boost::asio::read(socket, boost::asio::buffer(&responseSize, sizeof(short)));
processedData* response = reinterpret_cast<processedData*>(malloc(responseSize));
boost::asio::read(socket, boost::asio::buffer(response, responseSize));
// ...
// The client handles the data
// ...
return 0;
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
}
// File server.cpp
class ServerConnection
: public std::enable_shared_from_this<ServerConnection>
{
public:
using TCPSocket = boost::asio::ip::tcp::socket;
ServerConnection::ServerConnection(TCPSocket socket)
: socket_(std::move(socket)),
properties_(nullptr),
filePacket_(nullptr),
filePacketSize_(0)
{
}
void start() { doRead(); }
private:
void doRead()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(properties_, sizeof(propertiesPacket)),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec) {
processData();
doWrite(&filePacketSize_, sizeof(short));
doWrite(filePacket_, sizeof(*filePacket_));
}
});
}
void doWrite(void* data, size_t length)
{
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(data, length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec) { doRead(); }
});
}
void processData()
{ /* Data is processed */ }
TCPSocket socket_;
propertiesPacket* properties_;
processedData* filePacket_;
short filePacketSize_;
};
class Server
{
public:
using IOContext = boost::asio::io_context;
using TCPSocket = boost::asio::ip::tcp::socket;
using TCPAcceptor = boost::asio::ip::tcp::acceptor;
Server::Server(IOContext& io, short port)
: socket_(io),
acceptor_(io, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
{
doAccept();
}
private:
void doAccept()
{
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
{
if (!ec) {
std::make_shared<ServerConnection>(std::move(socket_))->start();
}
doAccept();
});
}
TCPSocket socket_;
TCPAcceptor acceptor_;
};
我做错了什么?我的猜测是,在 doRead
函数内部,多次调用 doWrite
函数,当该函数随后也调用 doRead
时,部分是导致问题的原因,但我不知道是什么多次异步写入数据的正确方法是。但我也确信这不是我的代码中唯一没有按照我认为应该的方式运行的部分。
除了我显示的代码问题外,确实还有你怀疑的问题:
My guess is that inside the doRead function, calling multiple times the doWrite function, when that function then also calls doRead is in part what's causing problems
“doRead”在同一个函数中的事实不一定是问题(那只是全双工套接字 IO)。然而,“多次调用”是。见 docs:
This operation is implemented in terms of zero or more calls to the stream's async_write_some
function, and is known as a composed operation. The program must ensure that the stream performs no other write operations (such as async_write
, the stream's async_write_some
function, or any other composed operations that perform writes) until this operation completes.
通常的方法是将整个消息放在一个缓冲区中,但如果复制起来“成本高昂”,您可以使用 BufferSequence,称为 scatter/gather buffers。
具体来说,您将替换
doWrite(&filePacketSize_, sizeof(short));
doWrite(filePacket_, sizeof(*filePacket_));
类似
std::vector<boost::asio::const_buffer> msg{
boost::asio::buffer(&filePacketSize_, sizeof(short)),
boost::asio::buffer(filePacket_, sizeof(*filePacket_)),
};
doWrite(msg);
Note that this assumes that filePacketSize
and filePacket
have been assigned proper values!
您当然可以修改 do_write
以接受缓冲区序列:
template <typename Buffers> void doWrite(Buffers msg)
{
auto self(shared_from_this());
boost::asio::async_write(
socket_, msg,
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec) {
doRead();
}
});
}
但在你的情况下,我会通过内联主体来简化(现在你不会多次调用它)。
边注
不要使用 new
或 delete
。切勿在 C++ 中使用 malloc
。永远不要使用 reinterpret_cast<>
(标准允许的极少数例外情况除外!)。而不是
processedData* response = reinterpret_cast<processedData*>(malloc(responseSize));
只需使用
processedData response;
(可选择添加 {}
用于聚合的值初始化)。如果您需要可变长度的消息,请考虑在消息中放置一个向量或数组。当然,数组是固定长度的,但它保留了 POD 特性,因此使用起来可能更容易。如果你使用矢量,你会想要一个 scatter/gather 读入一个缓冲区序列,就像我上面为写入端显示的那样。
与其在不一致的 short
和 unsigned short
类型之间重新解释,不如将类型拼写为标准大小:std::uint16_t
无处不在。
请记住,您没有考虑字节顺序,因此您的协议将无法跨 compilers/architectures.
移植
临时修复
这是我在查看您共享的代码后最终得到的列表。
#include <boost/asio.hpp>
#include <iostream>
namespace ba = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;
using TCPSocket = tcp::socket;
struct processedData { };
struct propertiesPacket { };
// File server.cpp
class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
public:
ServerConnection(TCPSocket socket) : socket_(std::move(socket))
{ }
void start() {
std::clog << __PRETTY_FUNCTION__ << std::endl;
doRead();
}
private:
void doRead()
{
std::clog << __PRETTY_FUNCTION__ << std::endl;
auto self(shared_from_this());
socket_.async_read_some(
ba::buffer(&properties_, sizeof(properties_)),
[this, self](error_code ec, std::size_t length) {
std::clog << "received: " << length << std::endl;
if (!ec) {
processData();
std::vector<ba::const_buffer> msg{
ba::buffer(&filePacketSize_, sizeof(uint16_t)),
ba::buffer(&filePacket_, filePacketSize_),
};
ba::async_write(socket_, msg,
[this, self = shared_from_this()](
error_code ec, std::size_t length) {
std::clog << " written: " << length
<< std::endl;
if (!ec) {
doRead();
}
});
}
});
}
void processData() {
std::clog << __PRETTY_FUNCTION__ << std::endl;
/* Data is processed */
}
TCPSocket socket_;
propertiesPacket properties_{};
processedData filePacket_{};
uint16_t filePacketSize_ = sizeof(filePacket_);
};
class Server
{
public:
using IOContext = ba::io_context;
using TCPAcceptor = tcp::acceptor;
Server(IOContext& io, uint16_t port)
: socket_(io)
, acceptor_(io, {tcp::v4(), port})
{
doAccept();
}
private:
void doAccept()
{
std::clog << __PRETTY_FUNCTION__ << std::endl;
acceptor_.async_accept(socket_, [this](error_code ec) {
if (!ec) {
std::clog << "Accepted " << socket_.remote_endpoint()
<< std::endl;
std::make_shared<ServerConnection>(std::move(socket_))->start();
doAccept();
} else {
std::clog << "Accept " << ec.message() << std::endl;
}
});
}
TCPSocket socket_;
TCPAcceptor acceptor_;
};
// File client.cpp
int main(int argc, char *argv[])
{
ba::io_context io;
Server s{io, 6869};
std::thread server_thread{[&io] {
io.run();
}};
// always check argc!
std::vector<std::string> args(argv, argv + argc);
if (args.size() == 1)
args = {"demo", "127.0.0.1", "6869"};
// avoid race with server accept thread
post(io, [&io, args] {
try {
propertiesPacket properties;
// ...
// We set the data inside the properties struct
// ...
tcp::socket socket(io);
tcp::resolver resolver(io);
connect(socket, resolver.resolve(args.at(1), args.at(2)));
write(socket, ba::buffer(&properties, sizeof(properties)));
uint16_t responseSize{};
ba::read(socket, ba::buffer(&responseSize, sizeof(uint16_t)));
std::clog << "Client responseSize: " << responseSize << std::endl;
processedData response{};
assert(responseSize <= sizeof(response));
ba::read(socket, ba::buffer(&response, responseSize));
// ...
// The client handles the data
// ...
// for online demo:
io.stop();
} catch (std::exception const& e) {
std::clog << e.what() << std::endl;
}
});
io.run_one();
server_thread.join();
}
正在打印类似于
的内容
void Server::doAccept()
Server::doAccept()::<lambda(boost::system::error_code)> Success
void ServerConnection::start()
void ServerConnection::doRead()
void Server::doAccept()
received: 1
void ServerConnection::processData()
written: 3
void ServerConnection::doRead()
Client responseSize: 1
我正在尝试为将使用 Boost ASIO 构建的项目实施一个简单的 IPC 协议。这个想法是通过 IP/TCP 与具有后端的服务器和将使用从服务器接收的数据构建前端的客户端进行通信。整个 session 会是这样的:
- 连接已建立
- 客户端发送一个 2 字节的数据包,其中包含一些信息,服务器将使用这些信息来构建其响应(这存储为结构
propertiesPacket
) - 服务器处理接收到的数据并将输出存储在名为
processedData
的可变大小结构中
- 服务器发送一个 2 字节无符号整数,指示客户端它将接收的结构的大小(假设该结构的大小为
n
字节) - 服务器将结构数据作为
n
字节数据包发送 - 连接结束
我尝试自己实现这个,遵循 Boost ASIO 文档中提供的精彩教程,以及库中包含的示例和我在 Github 上找到的一些回购协议,但这是我的第一手资料使用网络和 IPC,我无法让它工作,我的客户端 returns 一个异常,说连接被对等方重置。
我现在有的是:
// File client.cpp
int main(int argc, char *argv[])
{
try {
propertiesPacket properties;
// ...
// We set the data inside the properties struct
// ...
boost::asio::io_context io;
boost::asio::ip::tcp::socket socket(io);
boost::asio::ip::tcp::resolver resolver(io);
boost::asio::connect(socket, resolver.resolve(argv[1], argv[2]));
boost::asio::write(socket, boost::asio::buffer(&properties, sizeof(propertiesPacket)));
unsigned short responseSize {};
boost::asio::read(socket, boost::asio::buffer(&responseSize, sizeof(short)));
processedData* response = reinterpret_cast<processedData*>(malloc(responseSize));
boost::asio::read(socket, boost::asio::buffer(response, responseSize));
// ...
// The client handles the data
// ...
return 0;
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
}
// File server.cpp
class ServerConnection
: public std::enable_shared_from_this<ServerConnection>
{
public:
using TCPSocket = boost::asio::ip::tcp::socket;
ServerConnection::ServerConnection(TCPSocket socket)
: socket_(std::move(socket)),
properties_(nullptr),
filePacket_(nullptr),
filePacketSize_(0)
{
}
void start() { doRead(); }
private:
void doRead()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(properties_, sizeof(propertiesPacket)),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec) {
processData();
doWrite(&filePacketSize_, sizeof(short));
doWrite(filePacket_, sizeof(*filePacket_));
}
});
}
void doWrite(void* data, size_t length)
{
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(data, length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec) { doRead(); }
});
}
void processData()
{ /* Data is processed */ }
TCPSocket socket_;
propertiesPacket* properties_;
processedData* filePacket_;
short filePacketSize_;
};
class Server
{
public:
using IOContext = boost::asio::io_context;
using TCPSocket = boost::asio::ip::tcp::socket;
using TCPAcceptor = boost::asio::ip::tcp::acceptor;
Server::Server(IOContext& io, short port)
: socket_(io),
acceptor_(io, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
{
doAccept();
}
private:
void doAccept()
{
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec)
{
if (!ec) {
std::make_shared<ServerConnection>(std::move(socket_))->start();
}
doAccept();
});
}
TCPSocket socket_;
TCPAcceptor acceptor_;
};
我做错了什么?我的猜测是,在 doRead
函数内部,多次调用 doWrite
函数,当该函数随后也调用 doRead
时,部分是导致问题的原因,但我不知道是什么多次异步写入数据的正确方法是。但我也确信这不是我的代码中唯一没有按照我认为应该的方式运行的部分。
除了我
My guess is that inside the doRead function, calling multiple times the doWrite function, when that function then also calls doRead is in part what's causing problems
“doRead”在同一个函数中的事实不一定是问题(那只是全双工套接字 IO)。然而,“多次调用”是。见 docs:
This operation is implemented in terms of zero or more calls to the stream's
async_write_some
function, and is known as a composed operation. The program must ensure that the stream performs no other write operations (such asasync_write
, the stream'sasync_write_some
function, or any other composed operations that perform writes) until this operation completes.
通常的方法是将整个消息放在一个缓冲区中,但如果复制起来“成本高昂”,您可以使用 BufferSequence,称为 scatter/gather buffers。
具体来说,您将替换
doWrite(&filePacketSize_, sizeof(short));
doWrite(filePacket_, sizeof(*filePacket_));
类似
std::vector<boost::asio::const_buffer> msg{
boost::asio::buffer(&filePacketSize_, sizeof(short)),
boost::asio::buffer(filePacket_, sizeof(*filePacket_)),
};
doWrite(msg);
Note that this assumes that
filePacketSize
andfilePacket
have been assigned proper values!
您当然可以修改 do_write
以接受缓冲区序列:
template <typename Buffers> void doWrite(Buffers msg)
{
auto self(shared_from_this());
boost::asio::async_write(
socket_, msg,
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec) {
doRead();
}
});
}
但在你的情况下,我会通过内联主体来简化(现在你不会多次调用它)。
边注
不要使用 new
或 delete
。切勿在 C++ 中使用 malloc
。永远不要使用 reinterpret_cast<>
(标准允许的极少数例外情况除外!)。而不是
processedData* response = reinterpret_cast<processedData*>(malloc(responseSize));
只需使用
processedData response;
(可选择添加 {}
用于聚合的值初始化)。如果您需要可变长度的消息,请考虑在消息中放置一个向量或数组
与其在不一致的 short
和 unsigned short
类型之间重新解释,不如将类型拼写为标准大小:std::uint16_t
无处不在。
请记住,您没有考虑字节顺序,因此您的协议将无法跨 compilers/architectures.
临时修复
这是我在查看您共享的代码后最终得到的列表。
#include <boost/asio.hpp>
#include <iostream>
namespace ba = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;
using TCPSocket = tcp::socket;
struct processedData { };
struct propertiesPacket { };
// File server.cpp
class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
public:
ServerConnection(TCPSocket socket) : socket_(std::move(socket))
{ }
void start() {
std::clog << __PRETTY_FUNCTION__ << std::endl;
doRead();
}
private:
void doRead()
{
std::clog << __PRETTY_FUNCTION__ << std::endl;
auto self(shared_from_this());
socket_.async_read_some(
ba::buffer(&properties_, sizeof(properties_)),
[this, self](error_code ec, std::size_t length) {
std::clog << "received: " << length << std::endl;
if (!ec) {
processData();
std::vector<ba::const_buffer> msg{
ba::buffer(&filePacketSize_, sizeof(uint16_t)),
ba::buffer(&filePacket_, filePacketSize_),
};
ba::async_write(socket_, msg,
[this, self = shared_from_this()](
error_code ec, std::size_t length) {
std::clog << " written: " << length
<< std::endl;
if (!ec) {
doRead();
}
});
}
});
}
void processData() {
std::clog << __PRETTY_FUNCTION__ << std::endl;
/* Data is processed */
}
TCPSocket socket_;
propertiesPacket properties_{};
processedData filePacket_{};
uint16_t filePacketSize_ = sizeof(filePacket_);
};
class Server
{
public:
using IOContext = ba::io_context;
using TCPAcceptor = tcp::acceptor;
Server(IOContext& io, uint16_t port)
: socket_(io)
, acceptor_(io, {tcp::v4(), port})
{
doAccept();
}
private:
void doAccept()
{
std::clog << __PRETTY_FUNCTION__ << std::endl;
acceptor_.async_accept(socket_, [this](error_code ec) {
if (!ec) {
std::clog << "Accepted " << socket_.remote_endpoint()
<< std::endl;
std::make_shared<ServerConnection>(std::move(socket_))->start();
doAccept();
} else {
std::clog << "Accept " << ec.message() << std::endl;
}
});
}
TCPSocket socket_;
TCPAcceptor acceptor_;
};
// File client.cpp
int main(int argc, char *argv[])
{
ba::io_context io;
Server s{io, 6869};
std::thread server_thread{[&io] {
io.run();
}};
// always check argc!
std::vector<std::string> args(argv, argv + argc);
if (args.size() == 1)
args = {"demo", "127.0.0.1", "6869"};
// avoid race with server accept thread
post(io, [&io, args] {
try {
propertiesPacket properties;
// ...
// We set the data inside the properties struct
// ...
tcp::socket socket(io);
tcp::resolver resolver(io);
connect(socket, resolver.resolve(args.at(1), args.at(2)));
write(socket, ba::buffer(&properties, sizeof(properties)));
uint16_t responseSize{};
ba::read(socket, ba::buffer(&responseSize, sizeof(uint16_t)));
std::clog << "Client responseSize: " << responseSize << std::endl;
processedData response{};
assert(responseSize <= sizeof(response));
ba::read(socket, ba::buffer(&response, responseSize));
// ...
// The client handles the data
// ...
// for online demo:
io.stop();
} catch (std::exception const& e) {
std::clog << e.what() << std::endl;
}
});
io.run_one();
server_thread.join();
}
正在打印类似于
的内容void Server::doAccept()
Server::doAccept()::<lambda(boost::system::error_code)> Success
void ServerConnection::start()
void ServerConnection::doRead()
void Server::doAccept()
received: 1
void ServerConnection::processData()
written: 3
void ServerConnection::doRead()
Client responseSize: 1