尝试使用 boost::serialization 和存档反序列化对象时得到 "input stream error"

getting "input stream error" when trying to desirealize the object using boost::serialization and arcive

我正在尝试使用 boost::serializationboost::Arciveboost::split members(加载并保存)在 boost::message queue 上发送 class 问题是当我尝试反序列化时,我得到了 input stream error 异常

#include <iostream>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

#include <boost/version.hpp>
#include <random>


#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/archive/impl/basic_text_oarchive.ipp>
#include <boost/archive/impl/text_oarchive_impl.ipp>
#include <boost/archive/impl/text_iarchive_impl.ipp>
#include <boost/serialization/split_member.hpp>


class Data{ 
    public:
    int a_ ;   
    double b_ ;
    std::string s ;
    template<class Archive>   
    void serialize(Archive & ar, const unsigned int version)   {
    boost::serialization::split_member(ar, *this, version); 
    }

  template<class Archive>   void save(Archive & ar, unsigned int version) const   {
    // ar << order_request_type_;
    ar << a_;
    ar << b_;
    ar << s;   }

  template<class Archive>   void load(Archive & ar, unsigned int version)   {
    // ar >> order_request_type_;
    ar >> a_;
    ar >> b_;
    ar >> s;
  }


 private:
  // friend class boost::archive::access;
  friend class   boost::archive::save_access;

};


[[nodiscard]]bool RunChiled() { 
    using namespace boost::interprocess; try {
    message_queue mq(open_only   //open or create
        , "message_queue"     //name
    );
    unsigned int priority = 0;
    message_queue::size_type recvd_size;
    Data d;
    std::stringstream iss;
    std::string serialized_string;
    serialized_string.resize(150);
    long long number = 0;
    while(true)
    {
        mq.receive(&serialized_string[0], 150, recvd_size , priority);
        std::cout << serialized_string << "\n";
        iss << serialized_string;
        try{
          boost::archive::text_iarchive ia(iss);  // <-- getting the exception
          ia >> d;
        } catch  (const std::exception& ex) {
          std::cout << ex.what() << "\n";
        }
        number++;
        std::cout << d.a_ << " " << d.b_ << " " << d.s << "\n";
      }   }catch(const interprocess_exception &ex) {
    message_queue::remove("message_queue");
    std::cout << "interprocess_exception " << ex.what() << std::endl;
    return 1;   } catch (const std::exception& e)   {
    std::cout << "exception " << e.what() << std::endl;
    message_queue::remove("message_queue");
    return 1;   }

  message_queue::remove("message_queue");   return true; }


int main() {   std::cout << "1\n";   std::default_random_engine generator;   std::uniform_real_distribution<double> distribution(0,15);

  using namespace std;
    cout << "Boost version: " << BOOST_LIB_VERSION << endl;   using namespace boost::interprocess;   message_queue::remove("message_queue");   auto pid = fork();

  if(pid > 0)   {
    std::cout << "2\n";
    sleep(2);
    try {
      auto res = RunChiled();
      std::cout << res;
    } catch (...) {
      std::cout << "error\n";
    } 
 } else if(pid == 0)   {
    try{    
        boost::interprocess::message_queue mq(create_only,"message_queue", 100, 150);
        std::stringstream oss;
        Data request;       
        request.b_ = 17.5;      
        request.a_ = I;         
        request.s = to_string(17.5) + " " + to_string(i);       
        try {           
            boost::archive::text_oarchive oa(oss);
            oa << request;
        } catch (const std::exception& e) {
            std::cout << "serialzation:" << e.what() ;
        }
        try{    
           // std::cout << "oss " << oss.str() << "\n";         
           std::string serialized_string(oss.str());
           std::cout << "serialized_string " << oss.str().size() << "\n";                   
           mq.send(&serialized_string, serialized_string.size(), 0); 
        }catch(const std::exception& e){
            std::cout << "\n send     exeption " << e.what() << "\n";
        } 
    }
    }catch (const std::exception& e){             
        message_queue::remove("message_queue");
        std::cout << e.what() ;
    }
}   
return 0;
 }

一些大问题。

  1. 首先

    mq.send(&serialized_string, serialized_string.size(), 0);
    

    那是 未定义 行为 因为 serialzed_string 不是 POD,大小也不匹配。你 可能意味着在接收方:

    mq.send(serialized_string.data(), serialized_string.size(), 0);
    
  2. 您正在将 serialized_string 邮件的大小调整为 150,并且永远不会 回到 实际 大小。这意味着它将无法正常工作,因为 会有尾随数据。

    正在修复:

    std::string buffer(buffer_size, '[=12=]');
    unsigned int priority = 0;
    message_queue::size_type recvd_size;
    
    mq.receive(buffer.data(), buffer.size(), recvd_size, priority);
    buffer.resize(recvd_size);
    

其他注意事项

不拆分序列化可以更简单:

class Data {
  public:
    int a_;
    double b_;
    std::string s;

    template <class Archive> void serialize(Archive& ar, unsigned) {
        ar & a_ & b_ & s;
    }

  private:
    friend class boost::serialization::access; // not required
};

其他说明

  • return 1bool runChiled 中的 return true 看起来很可疑 - 其中一个可能是错误

  • [[nodiscard]] 对于一个通常不会 return 的函数来说似乎有点棘手([[noreturn]] 可能更合适,可以选择将异常传递出去?)

  • 你会想用一些真正随机的东西来播种你的随机生成器:

     std::default_random_engine generator { std::random_device{}() };
    

修复后的现场演示

Live On Coliru

Live On Wandbox

#include <iostream>
#include <iomanip>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>

#include <boost/core/demangle.hpp>
#include <boost/version.hpp>
#include <random>
#include <thread> // this_thread

#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/serialization/split_member.hpp>

namespace {
    namespace bip = boost::interprocess;
    using bip::message_queue;
    auto constexpr queuename = "message_queue";
    auto constexpr max_queued = 5;
    auto constexpr buffer_size = 150;

    auto sleep_for = [](auto d) { std::this_thread::sleep_for(d); };
    using namespace std::chrono_literals;
}

class Data {
  public:
    int a_{};
    double b_{};
    std::string s;

    template <class Ar> void serialize(Ar& ar, unsigned /*unused*/) {
        ar& a_& b_& s;
    }
};

[[noreturn]] void RunChild()
{
    std::cout << "2" << std::endl;
    sleep_for(2s);

    try {
        message_queue mq(bip::open_only,queuename);
        size_t number = 0;
        for (std::string buffer(buffer_size, '[=15=]');; buffer.resize(buffer_size)) {
            {
                unsigned int priority = 0;
                message_queue::size_type recvd_size = 0;
#ifdef COLIRU
                // make the process terminate for online compiler
                {
                    using namespace boost::posix_time;
                    auto deadline = second_clock::universal_time() + seconds(3);
                    if (not mq.timed_receive(buffer.data(), buffer.size(),
                            recvd_size, priority, deadline))
                    {
                        throw std::runtime_error("no more messages");
                    }
                }
#else
                mq.receive(buffer.data(), buffer.size(), recvd_size, priority);
#endif
                buffer.resize(recvd_size);
            }
            //std::cout << buffer << std::endl;

            Data d;
            try {
                std::stringstream iss(buffer);
                boost::archive::text_iarchive ia(iss);
                ia >> d;
            } catch (const std::exception& ex) {
                std::cout << ex.what() << std::endl;
            }
            ++number;
            std::cout << "Received: " << d.a_ << " " << d.b_ << " "
                      << std::quoted(d.s) << std::endl;
        }
    } catch (const std::exception& e) {
        std::cout << boost::core::demangle(typeid(e).name()) << " " << e.what()
                  << std::endl;
        message_queue::remove(queuename);
        throw; // re-raise
    }

    message_queue::remove(queuename);
}

static void RunParent()
{
    std::cout << "1" << std::endl;
    std::default_random_engine generator { std::random_device{}() };
    std::uniform_real_distribution<double> distribution(0, 15);

    message_queue mq(bip::create_only, queuename, max_queued, buffer_size);

    for (auto i = 0; i < 10; ++i) {
        auto value = distribution(generator);
        Data const request {
            i,
            value,
            std::to_string(value) + " " + std::to_string(i)
        };

        std::stringstream oss;
        try {
            boost::archive::text_oarchive oa(oss);
            oa << request;

            std::string buffer = std::move(oss).str();
            std::cout << "Sending " << buffer.size() << " bytes" << std::endl;
            mq.send(buffer.data(), buffer.size(), 0);
        } catch (const std::exception& e) {
            std::cout << "\nsend exeption " << e.what() << std::endl;
        }
    }
}

int main() {
    std::cout << "Boost version: " << BOOST_LIB_VERSION << std::endl;
    message_queue::remove(queuename);

    try {
        if (auto pid = ::fork(); pid > 0) {
            RunChild();
        } else if (pid == 0) {
            RunParent();
        }
    } catch (const std::exception& e) {
        std::cout << e.what() << std::endl;
        message_queue::remove(queuename);
    }
 }

版画

Boost version: 1_75
2
1
Sending 72 bytes
Sending 73 bytes
Sending 72 bytes
Sending 72 bytes
Sending 73 bytes
Sending 72 bytes
Sending 72 bytes
Received: 0 1.12642 "1.126420 0"
Received: 1 14.2474 "14.247412 1"
Received: 2 3.22163 "3.221631 2"
Sending 73 bytes
Sending 72 bytes
Received: 3 3.20471 "3.204709 3"
Sending 72 bytes
Received: 4 10.7838 "10.783761 4"
Received: 5 5.74063 "5.740629 5"
Received: 6 6.98008 "6.980078 6"
Received: 7 11.6643 "11.664257 7"
Received: 8 3.80561 "3.805614 8"
Received: 9 7.79641 "7.796408 9"
std::runtime_error no more messages
no more messages