通过 boost::serialization 序列化多个 std::shared_ptr 并通过 boost::asio 发送
Serializing multiple std::shared_ptr via boost::serialization and sending over boost::asio
我想通过 boost asio 将一个 shared_ptr
对象从客户端传输到服务器。这是我的代码:
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/serialization/export.hpp>
#include <boost/serialization/shared_ptr.hpp>
#include <chrono>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
using namespace std;
class Message {
public:
Message() {
}
virtual ~Message() {
}
string text;
private:
friend class boost::serialization::access;
template <class Archive>
void serialize(Archive &ar, const unsigned int version) {
ar &text;
}
};
BOOST_CLASS_EXPORT(Message)
void runClient() {
// Give server time to startup
this_thread::sleep_for(chrono::milliseconds(3000));
boost::asio::ip::tcp::iostream stream("localhost", "3000");
boost::archive::text_oarchive archive(stream);
for (int i = 0; i < 10; i++) {
std::shared_ptr<Message> dl = std::make_shared<Message>();
stringstream ss;
ss << "Hello " << i;
dl->text = ss.str();
archive << dl;
}
stream.close();
cout << "Client shutdown" << endl;
}
void handleIncommingClientConnection(boost::asio::ip::tcp::acceptor &acceptor) {
boost::asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
boost::archive::text_iarchive archive(stream);
while (true) {
std::shared_ptr<Message> m;
try {
archive >> m;
cout << m->text << endl;
} catch (std::exception &ex) {
cout << ex.what() << endl;
if (stream.eof()) {
cout << "eof" << endl;
stream.close();
cout << "Server: shutdown client handling..." << endl;
break;
} else
throw ex;
}
}
}
void runServer() {
boost::asio::io_service ios;
boost::asio::ip::tcp::endpoint endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 3000);
boost::asio::ip::tcp::acceptor acceptor(ios, endpoint);
handleIncommingClientConnection(acceptor);
}
int main(int argc, char **argv) {
thread clientThread(runClient);
thread serverThread(runServer);
clientThread.join();
serverThread.join();
return 0;
}
程序输出如下:
Hello 0
Hello 1
Hello 2
Hello 3
Hello 3
Hello 3
Hello 3
Hello 3
Client shutdown
Hello 3
Hello 3
input stream error
eof
Server: shutdown client handling...
我期待以下输出:
Hello 0
Hello 1
Hello 2
Hello 3
Hello 4
Hello 5
Hello 6
Hello 7
Client shutdown
Hello 8
Hello 9
input stream error
eof
Server: shutdown client handling...
将 shared_ptr
更改为简单对象(std::shared_ptr<Message> m;
到 Message m
)时,一切都按预期工作。我要坚持shared_ptr
。我需要更改什么?
单独序列化似乎可行:
stringstream stream;
{
boost::archive::text_oarchive archive(stream);
std::shared_ptr<Message> dl = std::make_shared<Message>();
stringstream ss;
ss << "Hello World!";
dl->text = ss.str();
archive << dl;
}
{
boost::archive::text_iarchive archive(stream);
std::shared_ptr<Message> m;
archive >> m;
cout << m->text << endl;
}
输出:Hello World!
您遇到的问题是由于 object tracking done by Boost.Serialization。
Depending on how the class is used and other factors, serialized
objects may be tracked by memory address. This prevents the same
object from being written to or read from an archive multiple times.
These stored addresses can also be used to delete objects created
during a loading process that has been interrupted by throwing of an
exception.
文档实际上预示了这个特定问题的发生:
This could cause problems in progams[sic] where the copies of different
objects are saved from the same address.
此外,Class Serialization Traits 对象跟踪文档告诉我们,在这种特定情况下,对象跟踪已启用:
Default tracking traits are:
- For primitive, track_never.
- For pointers, track_never. That is, addresses of addresses are not tracked by default.
- All current serialization wrappers such as boost::serialization::nvp, track_never.
- For all other types, track_selectively. That is addresses of serialized objects are tracked if and only if one or more of the
following is true:
- an object of this type is anywhere in the program serialized through a pointer.
- the class is explicitly "exported" - see below.
- the class is explicitly "registered" in the archive
回到你的情况——在客户端,由于你的循环体的编写方式,第 5 个(及以下)Message
实例被分配在与第 4 个相同的地址 Message
实例。您可以通过在每次迭代中检查 dl.get()
的值来验证这一点。 (在我对 coliru 的测试中,所有的实例都分配在同一个地址,所以 YMMV)。
由于对象跟踪的工作原理,所有这些 shared_ptr
实例都被认为指向同一个 Message
实例(即使您同时更改了值——库不希望发生这种情况), 所以额外的事件只是作为额外的参考序列化。反序列化后...老实说,这有内存泄漏的味道 and/or 悬空引用问题(意见,尚未对此进行详细调查)。
总而言之,所示代码的主要问题是它破坏了序列化库的先决条件,即您正在序列化某些常量状态,而在反序列化时您会重新创建相同的状态。
解决此问题的一种方法是初始化 std::vector
of shared_ptr<Message>
,其中包含要在此特定事务中传输的所有消息。同样,您将反序列化另一侧的整个向量。如果您希望有一些持久连接,则将框架添加到协议中,每个框架包含一个包含一个消息序列的存档。
进行最少的代码修改以实现此功能——添加 include
#include <boost/serialization/vector.hpp>
将runClient()
改成这样:
void runClient() {
// Give server time to startup
this_thread::sleep_for(chrono::milliseconds(3000));
boost::asio::ip::tcp::iostream stream("127.0.0.1", "3000");
std::vector<std::shared_ptr<Message>> messages;
for (int i = 0; i < 10; i++) {
std::shared_ptr<Message> dl = std::make_shared<Message>();
stringstream ss;
ss << "Hello " << i;
dl->text = ss.str();
messages.emplace_back(dl);
}
boost::archive::text_oarchive archive(stream);
archive << messages;
stream.close();
cout << "Client shutdown" << endl;
}
然后将 handleIncommingClientConnection(...)
更改为:
void handleIncommingClientConnection(boost::asio::ip::tcp::acceptor &acceptor) {
boost::asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
boost::archive::text_iarchive archive(stream);
while (true) {
try {
std::vector<std::shared_ptr<Message>> messages;
archive >> messages;
for (auto const& m : messages) {
cout << m->text << endl;
}
} catch (std::exception &ex) {
cout << ex.what() << endl;
if (stream.eof()) {
cout << "eof" << endl;
stream.close();
cout << "Server: shutdown client handling..." << endl;
break;
} else
throw ex;
}
}
}
NB:这不会添加对多帧的任何支持——客户端应该在发送一个消息向量后关闭连接,否则行为未定义.
更多资源:
- boost serialization multiple objects
我想通过 boost asio 将一个 shared_ptr
对象从客户端传输到服务器。这是我的代码:
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/serialization/export.hpp>
#include <boost/serialization/shared_ptr.hpp>
#include <chrono>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
using namespace std;
class Message {
public:
Message() {
}
virtual ~Message() {
}
string text;
private:
friend class boost::serialization::access;
template <class Archive>
void serialize(Archive &ar, const unsigned int version) {
ar &text;
}
};
BOOST_CLASS_EXPORT(Message)
void runClient() {
// Give server time to startup
this_thread::sleep_for(chrono::milliseconds(3000));
boost::asio::ip::tcp::iostream stream("localhost", "3000");
boost::archive::text_oarchive archive(stream);
for (int i = 0; i < 10; i++) {
std::shared_ptr<Message> dl = std::make_shared<Message>();
stringstream ss;
ss << "Hello " << i;
dl->text = ss.str();
archive << dl;
}
stream.close();
cout << "Client shutdown" << endl;
}
void handleIncommingClientConnection(boost::asio::ip::tcp::acceptor &acceptor) {
boost::asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
boost::archive::text_iarchive archive(stream);
while (true) {
std::shared_ptr<Message> m;
try {
archive >> m;
cout << m->text << endl;
} catch (std::exception &ex) {
cout << ex.what() << endl;
if (stream.eof()) {
cout << "eof" << endl;
stream.close();
cout << "Server: shutdown client handling..." << endl;
break;
} else
throw ex;
}
}
}
void runServer() {
boost::asio::io_service ios;
boost::asio::ip::tcp::endpoint endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 3000);
boost::asio::ip::tcp::acceptor acceptor(ios, endpoint);
handleIncommingClientConnection(acceptor);
}
int main(int argc, char **argv) {
thread clientThread(runClient);
thread serverThread(runServer);
clientThread.join();
serverThread.join();
return 0;
}
程序输出如下:
Hello 0
Hello 1
Hello 2
Hello 3
Hello 3
Hello 3
Hello 3
Hello 3
Client shutdown
Hello 3
Hello 3
input stream error
eof
Server: shutdown client handling...
我期待以下输出:
Hello 0
Hello 1
Hello 2
Hello 3
Hello 4
Hello 5
Hello 6
Hello 7
Client shutdown
Hello 8
Hello 9
input stream error
eof
Server: shutdown client handling...
将 shared_ptr
更改为简单对象(std::shared_ptr<Message> m;
到 Message m
)时,一切都按预期工作。我要坚持shared_ptr
。我需要更改什么?
单独序列化似乎可行:
stringstream stream;
{
boost::archive::text_oarchive archive(stream);
std::shared_ptr<Message> dl = std::make_shared<Message>();
stringstream ss;
ss << "Hello World!";
dl->text = ss.str();
archive << dl;
}
{
boost::archive::text_iarchive archive(stream);
std::shared_ptr<Message> m;
archive >> m;
cout << m->text << endl;
}
输出:Hello World!
您遇到的问题是由于 object tracking done by Boost.Serialization。
Depending on how the class is used and other factors, serialized objects may be tracked by memory address. This prevents the same object from being written to or read from an archive multiple times. These stored addresses can also be used to delete objects created during a loading process that has been interrupted by throwing of an exception.
文档实际上预示了这个特定问题的发生:
This could cause problems in progams[sic] where the copies of different objects are saved from the same address.
此外,Class Serialization Traits 对象跟踪文档告诉我们,在这种特定情况下,对象跟踪已启用:
Default tracking traits are:
- For primitive, track_never.
- For pointers, track_never. That is, addresses of addresses are not tracked by default.
- All current serialization wrappers such as boost::serialization::nvp, track_never.
- For all other types, track_selectively. That is addresses of serialized objects are tracked if and only if one or more of the following is true:
- an object of this type is anywhere in the program serialized through a pointer.
- the class is explicitly "exported" - see below.
- the class is explicitly "registered" in the archive
回到你的情况——在客户端,由于你的循环体的编写方式,第 5 个(及以下)Message
实例被分配在与第 4 个相同的地址 Message
实例。您可以通过在每次迭代中检查 dl.get()
的值来验证这一点。 (在我对 coliru 的测试中,所有的实例都分配在同一个地址,所以 YMMV)。
由于对象跟踪的工作原理,所有这些 shared_ptr
实例都被认为指向同一个 Message
实例(即使您同时更改了值——库不希望发生这种情况), 所以额外的事件只是作为额外的参考序列化。反序列化后...老实说,这有内存泄漏的味道 and/or 悬空引用问题(意见,尚未对此进行详细调查)。
总而言之,所示代码的主要问题是它破坏了序列化库的先决条件,即您正在序列化某些常量状态,而在反序列化时您会重新创建相同的状态。
解决此问题的一种方法是初始化 std::vector
of shared_ptr<Message>
,其中包含要在此特定事务中传输的所有消息。同样,您将反序列化另一侧的整个向量。如果您希望有一些持久连接,则将框架添加到协议中,每个框架包含一个包含一个消息序列的存档。
进行最少的代码修改以实现此功能——添加 include
#include <boost/serialization/vector.hpp>
将runClient()
改成这样:
void runClient() {
// Give server time to startup
this_thread::sleep_for(chrono::milliseconds(3000));
boost::asio::ip::tcp::iostream stream("127.0.0.1", "3000");
std::vector<std::shared_ptr<Message>> messages;
for (int i = 0; i < 10; i++) {
std::shared_ptr<Message> dl = std::make_shared<Message>();
stringstream ss;
ss << "Hello " << i;
dl->text = ss.str();
messages.emplace_back(dl);
}
boost::archive::text_oarchive archive(stream);
archive << messages;
stream.close();
cout << "Client shutdown" << endl;
}
然后将 handleIncommingClientConnection(...)
更改为:
void handleIncommingClientConnection(boost::asio::ip::tcp::acceptor &acceptor) {
boost::asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
boost::archive::text_iarchive archive(stream);
while (true) {
try {
std::vector<std::shared_ptr<Message>> messages;
archive >> messages;
for (auto const& m : messages) {
cout << m->text << endl;
}
} catch (std::exception &ex) {
cout << ex.what() << endl;
if (stream.eof()) {
cout << "eof" << endl;
stream.close();
cout << "Server: shutdown client handling..." << endl;
break;
} else
throw ex;
}
}
}
NB:这不会添加对多帧的任何支持——客户端应该在发送一个消息向量后关闭连接,否则行为未定义.
更多资源:
- boost serialization multiple objects