尝试使用 boost::serialization 和存档反序列化对象时得到 "input stream error"
getting "input stream error" when trying to desirealize the object using boost::serialization and arcive
我正在尝试使用 boost::serialization
、boost::Arcive
和 boost::split members
(加载并保存)在 boost::message queue
上发送 class
问题是当我尝试反序列化时,我得到了 input stream error
异常
#include <iostream>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/version.hpp>
#include <random>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/archive/impl/basic_text_oarchive.ipp>
#include <boost/archive/impl/text_oarchive_impl.ipp>
#include <boost/archive/impl/text_iarchive_impl.ipp>
#include <boost/serialization/split_member.hpp>
class Data{
public:
int a_ ;
double b_ ;
std::string s ;
template<class Archive>
void serialize(Archive & ar, const unsigned int version) {
boost::serialization::split_member(ar, *this, version);
}
template<class Archive> void save(Archive & ar, unsigned int version) const {
// ar << order_request_type_;
ar << a_;
ar << b_;
ar << s; }
template<class Archive> void load(Archive & ar, unsigned int version) {
// ar >> order_request_type_;
ar >> a_;
ar >> b_;
ar >> s;
}
private:
// friend class boost::archive::access;
friend class boost::archive::save_access;
};
[[nodiscard]]bool RunChiled() {
using namespace boost::interprocess; try {
message_queue mq(open_only //open or create
, "message_queue" //name
);
unsigned int priority = 0;
message_queue::size_type recvd_size;
Data d;
std::stringstream iss;
std::string serialized_string;
serialized_string.resize(150);
long long number = 0;
while(true)
{
mq.receive(&serialized_string[0], 150, recvd_size , priority);
std::cout << serialized_string << "\n";
iss << serialized_string;
try{
boost::archive::text_iarchive ia(iss); // <-- getting the exception
ia >> d;
} catch (const std::exception& ex) {
std::cout << ex.what() << "\n";
}
number++;
std::cout << d.a_ << " " << d.b_ << " " << d.s << "\n";
} }catch(const interprocess_exception &ex) {
message_queue::remove("message_queue");
std::cout << "interprocess_exception " << ex.what() << std::endl;
return 1; } catch (const std::exception& e) {
std::cout << "exception " << e.what() << std::endl;
message_queue::remove("message_queue");
return 1; }
message_queue::remove("message_queue"); return true; }
int main() { std::cout << "1\n"; std::default_random_engine generator; std::uniform_real_distribution<double> distribution(0,15);
using namespace std;
cout << "Boost version: " << BOOST_LIB_VERSION << endl; using namespace boost::interprocess; message_queue::remove("message_queue"); auto pid = fork();
if(pid > 0) {
std::cout << "2\n";
sleep(2);
try {
auto res = RunChiled();
std::cout << res;
} catch (...) {
std::cout << "error\n";
}
} else if(pid == 0) {
try{
boost::interprocess::message_queue mq(create_only,"message_queue", 100, 150);
std::stringstream oss;
Data request;
request.b_ = 17.5;
request.a_ = I;
request.s = to_string(17.5) + " " + to_string(i);
try {
boost::archive::text_oarchive oa(oss);
oa << request;
} catch (const std::exception& e) {
std::cout << "serialzation:" << e.what() ;
}
try{
// std::cout << "oss " << oss.str() << "\n";
std::string serialized_string(oss.str());
std::cout << "serialized_string " << oss.str().size() << "\n";
mq.send(&serialized_string, serialized_string.size(), 0);
}catch(const std::exception& e){
std::cout << "\n send exeption " << e.what() << "\n";
}
}
}catch (const std::exception& e){
message_queue::remove("message_queue");
std::cout << e.what() ;
}
}
return 0;
}
一些大问题。
首先
mq.send(&serialized_string, serialized_string.size(), 0);
那是 未定义
行为 因为
serialzed_string
不是 POD,大小也不匹配。你
可能意味着在接收方:
mq.send(serialized_string.data(), serialized_string.size(), 0);
您正在将 serialized_string
邮件的大小调整为 150,并且永远不会
回到 实际 大小。这意味着它将无法正常工作,因为
会有尾随数据。
正在修复:
std::string buffer(buffer_size, '[=12=]');
unsigned int priority = 0;
message_queue::size_type recvd_size;
mq.receive(buffer.data(), buffer.size(), recvd_size, priority);
buffer.resize(recvd_size);
其他注意事项
不拆分序列化可以更简单:
class Data {
public:
int a_;
double b_;
std::string s;
template <class Archive> void serialize(Archive& ar, unsigned) {
ar & a_ & b_ & s;
}
private:
friend class boost::serialization::access; // not required
};
其他说明
return 1
和 bool runChiled
中的 return true
看起来很可疑 - 其中一个可能是错误
[[nodiscard]]
对于一个通常不会 return 的函数来说似乎有点棘手([[noreturn]]
可能更合适,可以选择将异常传递出去?)
你会想用一些真正随机的东西来播种你的随机生成器:
std::default_random_engine generator { std::random_device{}() };
修复后的现场演示
#include <iostream>
#include <iomanip>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <boost/core/demangle.hpp>
#include <boost/version.hpp>
#include <random>
#include <thread> // this_thread
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/serialization/split_member.hpp>
namespace {
namespace bip = boost::interprocess;
using bip::message_queue;
auto constexpr queuename = "message_queue";
auto constexpr max_queued = 5;
auto constexpr buffer_size = 150;
auto sleep_for = [](auto d) { std::this_thread::sleep_for(d); };
using namespace std::chrono_literals;
}
class Data {
public:
int a_{};
double b_{};
std::string s;
template <class Ar> void serialize(Ar& ar, unsigned /*unused*/) {
ar& a_& b_& s;
}
};
[[noreturn]] void RunChild()
{
std::cout << "2" << std::endl;
sleep_for(2s);
try {
message_queue mq(bip::open_only,queuename);
size_t number = 0;
for (std::string buffer(buffer_size, '[=15=]');; buffer.resize(buffer_size)) {
{
unsigned int priority = 0;
message_queue::size_type recvd_size = 0;
#ifdef COLIRU
// make the process terminate for online compiler
{
using namespace boost::posix_time;
auto deadline = second_clock::universal_time() + seconds(3);
if (not mq.timed_receive(buffer.data(), buffer.size(),
recvd_size, priority, deadline))
{
throw std::runtime_error("no more messages");
}
}
#else
mq.receive(buffer.data(), buffer.size(), recvd_size, priority);
#endif
buffer.resize(recvd_size);
}
//std::cout << buffer << std::endl;
Data d;
try {
std::stringstream iss(buffer);
boost::archive::text_iarchive ia(iss);
ia >> d;
} catch (const std::exception& ex) {
std::cout << ex.what() << std::endl;
}
++number;
std::cout << "Received: " << d.a_ << " " << d.b_ << " "
<< std::quoted(d.s) << std::endl;
}
} catch (const std::exception& e) {
std::cout << boost::core::demangle(typeid(e).name()) << " " << e.what()
<< std::endl;
message_queue::remove(queuename);
throw; // re-raise
}
message_queue::remove(queuename);
}
static void RunParent()
{
std::cout << "1" << std::endl;
std::default_random_engine generator { std::random_device{}() };
std::uniform_real_distribution<double> distribution(0, 15);
message_queue mq(bip::create_only, queuename, max_queued, buffer_size);
for (auto i = 0; i < 10; ++i) {
auto value = distribution(generator);
Data const request {
i,
value,
std::to_string(value) + " " + std::to_string(i)
};
std::stringstream oss;
try {
boost::archive::text_oarchive oa(oss);
oa << request;
std::string buffer = std::move(oss).str();
std::cout << "Sending " << buffer.size() << " bytes" << std::endl;
mq.send(buffer.data(), buffer.size(), 0);
} catch (const std::exception& e) {
std::cout << "\nsend exeption " << e.what() << std::endl;
}
}
}
int main() {
std::cout << "Boost version: " << BOOST_LIB_VERSION << std::endl;
message_queue::remove(queuename);
try {
if (auto pid = ::fork(); pid > 0) {
RunChild();
} else if (pid == 0) {
RunParent();
}
} catch (const std::exception& e) {
std::cout << e.what() << std::endl;
message_queue::remove(queuename);
}
}
版画
Boost version: 1_75
2
1
Sending 72 bytes
Sending 73 bytes
Sending 72 bytes
Sending 72 bytes
Sending 73 bytes
Sending 72 bytes
Sending 72 bytes
Received: 0 1.12642 "1.126420 0"
Received: 1 14.2474 "14.247412 1"
Received: 2 3.22163 "3.221631 2"
Sending 73 bytes
Sending 72 bytes
Received: 3 3.20471 "3.204709 3"
Sending 72 bytes
Received: 4 10.7838 "10.783761 4"
Received: 5 5.74063 "5.740629 5"
Received: 6 6.98008 "6.980078 6"
Received: 7 11.6643 "11.664257 7"
Received: 8 3.80561 "3.805614 8"
Received: 9 7.79641 "7.796408 9"
std::runtime_error no more messages
no more messages
我正在尝试使用 boost::serialization
、boost::Arcive
和 boost::split members
(加载并保存)在 boost::message queue
上发送 class
问题是当我尝试反序列化时,我得到了 input stream error
异常
#include <iostream>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/version.hpp>
#include <random>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/archive/impl/basic_text_oarchive.ipp>
#include <boost/archive/impl/text_oarchive_impl.ipp>
#include <boost/archive/impl/text_iarchive_impl.ipp>
#include <boost/serialization/split_member.hpp>
class Data{
public:
int a_ ;
double b_ ;
std::string s ;
template<class Archive>
void serialize(Archive & ar, const unsigned int version) {
boost::serialization::split_member(ar, *this, version);
}
template<class Archive> void save(Archive & ar, unsigned int version) const {
// ar << order_request_type_;
ar << a_;
ar << b_;
ar << s; }
template<class Archive> void load(Archive & ar, unsigned int version) {
// ar >> order_request_type_;
ar >> a_;
ar >> b_;
ar >> s;
}
private:
// friend class boost::archive::access;
friend class boost::archive::save_access;
};
[[nodiscard]]bool RunChiled() {
using namespace boost::interprocess; try {
message_queue mq(open_only //open or create
, "message_queue" //name
);
unsigned int priority = 0;
message_queue::size_type recvd_size;
Data d;
std::stringstream iss;
std::string serialized_string;
serialized_string.resize(150);
long long number = 0;
while(true)
{
mq.receive(&serialized_string[0], 150, recvd_size , priority);
std::cout << serialized_string << "\n";
iss << serialized_string;
try{
boost::archive::text_iarchive ia(iss); // <-- getting the exception
ia >> d;
} catch (const std::exception& ex) {
std::cout << ex.what() << "\n";
}
number++;
std::cout << d.a_ << " " << d.b_ << " " << d.s << "\n";
} }catch(const interprocess_exception &ex) {
message_queue::remove("message_queue");
std::cout << "interprocess_exception " << ex.what() << std::endl;
return 1; } catch (const std::exception& e) {
std::cout << "exception " << e.what() << std::endl;
message_queue::remove("message_queue");
return 1; }
message_queue::remove("message_queue"); return true; }
int main() { std::cout << "1\n"; std::default_random_engine generator; std::uniform_real_distribution<double> distribution(0,15);
using namespace std;
cout << "Boost version: " << BOOST_LIB_VERSION << endl; using namespace boost::interprocess; message_queue::remove("message_queue"); auto pid = fork();
if(pid > 0) {
std::cout << "2\n";
sleep(2);
try {
auto res = RunChiled();
std::cout << res;
} catch (...) {
std::cout << "error\n";
}
} else if(pid == 0) {
try{
boost::interprocess::message_queue mq(create_only,"message_queue", 100, 150);
std::stringstream oss;
Data request;
request.b_ = 17.5;
request.a_ = I;
request.s = to_string(17.5) + " " + to_string(i);
try {
boost::archive::text_oarchive oa(oss);
oa << request;
} catch (const std::exception& e) {
std::cout << "serialzation:" << e.what() ;
}
try{
// std::cout << "oss " << oss.str() << "\n";
std::string serialized_string(oss.str());
std::cout << "serialized_string " << oss.str().size() << "\n";
mq.send(&serialized_string, serialized_string.size(), 0);
}catch(const std::exception& e){
std::cout << "\n send exeption " << e.what() << "\n";
}
}
}catch (const std::exception& e){
message_queue::remove("message_queue");
std::cout << e.what() ;
}
}
return 0;
}
一些大问题。
首先
mq.send(&serialized_string, serialized_string.size(), 0);
那是 未定义 行为 因为
serialzed_string
不是 POD,大小也不匹配。你 可能意味着在接收方:mq.send(serialized_string.data(), serialized_string.size(), 0);
您正在将
serialized_string
邮件的大小调整为 150,并且永远不会 回到 实际 大小。这意味着它将无法正常工作,因为 会有尾随数据。正在修复:
std::string buffer(buffer_size, '[=12=]'); unsigned int priority = 0; message_queue::size_type recvd_size; mq.receive(buffer.data(), buffer.size(), recvd_size, priority); buffer.resize(recvd_size);
其他注意事项
不拆分序列化可以更简单:
class Data {
public:
int a_;
double b_;
std::string s;
template <class Archive> void serialize(Archive& ar, unsigned) {
ar & a_ & b_ & s;
}
private:
friend class boost::serialization::access; // not required
};
其他说明
return 1
和bool runChiled
中的return true
看起来很可疑 - 其中一个可能是错误[[nodiscard]]
对于一个通常不会 return 的函数来说似乎有点棘手([[noreturn]]
可能更合适,可以选择将异常传递出去?)你会想用一些真正随机的东西来播种你的随机生成器:
std::default_random_engine generator { std::random_device{}() };
修复后的现场演示
#include <iostream>
#include <iomanip>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <boost/core/demangle.hpp>
#include <boost/version.hpp>
#include <random>
#include <thread> // this_thread
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/serialization/split_member.hpp>
namespace {
namespace bip = boost::interprocess;
using bip::message_queue;
auto constexpr queuename = "message_queue";
auto constexpr max_queued = 5;
auto constexpr buffer_size = 150;
auto sleep_for = [](auto d) { std::this_thread::sleep_for(d); };
using namespace std::chrono_literals;
}
class Data {
public:
int a_{};
double b_{};
std::string s;
template <class Ar> void serialize(Ar& ar, unsigned /*unused*/) {
ar& a_& b_& s;
}
};
[[noreturn]] void RunChild()
{
std::cout << "2" << std::endl;
sleep_for(2s);
try {
message_queue mq(bip::open_only,queuename);
size_t number = 0;
for (std::string buffer(buffer_size, '[=15=]');; buffer.resize(buffer_size)) {
{
unsigned int priority = 0;
message_queue::size_type recvd_size = 0;
#ifdef COLIRU
// make the process terminate for online compiler
{
using namespace boost::posix_time;
auto deadline = second_clock::universal_time() + seconds(3);
if (not mq.timed_receive(buffer.data(), buffer.size(),
recvd_size, priority, deadline))
{
throw std::runtime_error("no more messages");
}
}
#else
mq.receive(buffer.data(), buffer.size(), recvd_size, priority);
#endif
buffer.resize(recvd_size);
}
//std::cout << buffer << std::endl;
Data d;
try {
std::stringstream iss(buffer);
boost::archive::text_iarchive ia(iss);
ia >> d;
} catch (const std::exception& ex) {
std::cout << ex.what() << std::endl;
}
++number;
std::cout << "Received: " << d.a_ << " " << d.b_ << " "
<< std::quoted(d.s) << std::endl;
}
} catch (const std::exception& e) {
std::cout << boost::core::demangle(typeid(e).name()) << " " << e.what()
<< std::endl;
message_queue::remove(queuename);
throw; // re-raise
}
message_queue::remove(queuename);
}
static void RunParent()
{
std::cout << "1" << std::endl;
std::default_random_engine generator { std::random_device{}() };
std::uniform_real_distribution<double> distribution(0, 15);
message_queue mq(bip::create_only, queuename, max_queued, buffer_size);
for (auto i = 0; i < 10; ++i) {
auto value = distribution(generator);
Data const request {
i,
value,
std::to_string(value) + " " + std::to_string(i)
};
std::stringstream oss;
try {
boost::archive::text_oarchive oa(oss);
oa << request;
std::string buffer = std::move(oss).str();
std::cout << "Sending " << buffer.size() << " bytes" << std::endl;
mq.send(buffer.data(), buffer.size(), 0);
} catch (const std::exception& e) {
std::cout << "\nsend exeption " << e.what() << std::endl;
}
}
}
int main() {
std::cout << "Boost version: " << BOOST_LIB_VERSION << std::endl;
message_queue::remove(queuename);
try {
if (auto pid = ::fork(); pid > 0) {
RunChild();
} else if (pid == 0) {
RunParent();
}
} catch (const std::exception& e) {
std::cout << e.what() << std::endl;
message_queue::remove(queuename);
}
}
版画
Boost version: 1_75
2
1
Sending 72 bytes
Sending 73 bytes
Sending 72 bytes
Sending 72 bytes
Sending 73 bytes
Sending 72 bytes
Sending 72 bytes
Received: 0 1.12642 "1.126420 0"
Received: 1 14.2474 "14.247412 1"
Received: 2 3.22163 "3.221631 2"
Sending 73 bytes
Sending 72 bytes
Received: 3 3.20471 "3.204709 3"
Sending 72 bytes
Received: 4 10.7838 "10.783761 4"
Received: 5 5.74063 "5.740629 5"
Received: 6 6.98008 "6.980078 6"
Received: 7 11.6643 "11.664257 7"
Received: 8 3.80561 "3.805614 8"
Received: 9 7.79641 "7.796408 9"
std::runtime_error no more messages
no more messages