使用 MsgPack 通过 ZeroMQ (zmqpp) 发送数据会出现 'msgpack::v1::insufficient_bytes' 错误
Sending data through ZeroMQ (zmqpp) using MsgPack gives 'msgpack::v1::insufficient_bytes' error
我使用 zmqpp and now I want to send data from the publisher to the subscribers using the header-only, C++11 version of msgpack-c 建立了一个 PUB/SUB 连接。
发布者必须发送 2 个 int64_t
号码 -- header_1
和 header_2
-- 然后是 std::vector<T>
-- data
--,其中 T
由 (header_1, header_2)
组合决定。
既然没有那么多关于如何组合 msgpack 和 zmqpp 的例子,我想出的想法是使用 msgpack zmqpp::message::add/add_raw
. Each part would be packed/unpacked 发送一个由 3 部分组成的消息。
发布者打包单个数据部分如下:
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
然后接收方像这样解包:
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
当我 运行 代码时,我在订阅者端收到以下错误:
terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
what(): insufficient bytes
Aborted
此外,zmqpp 似乎生成了一个由 5 部分组成的消息,即使我只调用了 3 次 add()
。
Q1:我packing/unpacking数据正确吗?
问题 2:这是使用 zmqpp 发送 msgpack 缓冲区的正确方法吗?
以下是代码的重要部分:
出版商
zmqpp::socket publisherSock;
/* connection setup stuff ...*/
// forever send data to the subscribers
while(true)
{
zmqpp::message msg;
// meta info about the data
int64_t header_1 = 1234567;
int64_t header_2 = 89;
// sample data
std::vector<double> data;
data.push_back(1.2);
data.push_back(3.4);
data.push_back(5.6);
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
cout << "header_1:" << header_1 << endl; // header_1:1234567
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_2);
msg.add(buffer.data(), buffer.size());
cout << "header_2:" << header_2 << endl; // header_2:89
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, data);
msg.add_raw(buffer.data(), buffer.size());
std::cout << "data: " << data << std::endl; // data:[1.2 3.4 5.6]
}
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"... why ?
publisherSock.send(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
订阅者
zmqpp::socket subscriberSock;
/* connection setup stuff ...*/
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
int64_t header_2;
std::vector<double> data;
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"
{
// header 1
{
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
cout << "header_1:" << header_1 << endl;
}
// header 2
{
msgpack::unpacked unpackedData;
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(1)),
msg.size(1));
unpackedData.get().convert(&header_2);
cout << "header_2:" << header_2 << endl;
}
// data
{
msgpack::unpacked unpacked_data;
msgpack::unpack(unpacked_data,
static_cast<const char*>(msg.raw_data(2)),
msg.size(2));
unpacked_data.get().convert(&data);
std::cout << "data:" << data << std::endl;
}
}
编辑: 问题已解决:正如@Jens 所指出的,packing/sending 数据的正确方法是使用 zmqpp::message::add_raw()
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());
我认为对 msg.add(buffer.data(), buffer.size()
的调用不会添加 buffer.size()
字节的数组,而是调用 message::add(Type const& part, Args &&...args)
,
msg << buffer.data()
,这可能会调用 message::operator<<(bool)
,因为指针转换为 bool
add(buffer.size())
然后调用 msg << buffer.size()
,它添加一个 size_t
值作为下一部分。
查看 zmqpp::message class,使用 message::add_raw 应该可以解决问题。
PS: 这一切都没有任何保证,因为我从未使用过zmqpp或msgpack。
我使用 zmqpp and now I want to send data from the publisher to the subscribers using the header-only, C++11 version of msgpack-c 建立了一个 PUB/SUB 连接。
发布者必须发送 2 个 int64_t
号码 -- header_1
和 header_2
-- 然后是 std::vector<T>
-- data
--,其中 T
由 (header_1, header_2)
组合决定。
既然没有那么多关于如何组合 msgpack 和 zmqpp 的例子,我想出的想法是使用 msgpack zmqpp::message::add/add_raw
. Each part would be packed/unpacked 发送一个由 3 部分组成的消息。
发布者打包单个数据部分如下:
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
然后接收方像这样解包:
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
当我 运行 代码时,我在订阅者端收到以下错误:
terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
what(): insufficient bytes
Aborted
此外,zmqpp 似乎生成了一个由 5 部分组成的消息,即使我只调用了 3 次 add()
。
Q1:我packing/unpacking数据正确吗?
问题 2:这是使用 zmqpp 发送 msgpack 缓冲区的正确方法吗?
以下是代码的重要部分:
出版商
zmqpp::socket publisherSock;
/* connection setup stuff ...*/
// forever send data to the subscribers
while(true)
{
zmqpp::message msg;
// meta info about the data
int64_t header_1 = 1234567;
int64_t header_2 = 89;
// sample data
std::vector<double> data;
data.push_back(1.2);
data.push_back(3.4);
data.push_back(5.6);
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
cout << "header_1:" << header_1 << endl; // header_1:1234567
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_2);
msg.add(buffer.data(), buffer.size());
cout << "header_2:" << header_2 << endl; // header_2:89
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, data);
msg.add_raw(buffer.data(), buffer.size());
std::cout << "data: " << data << std::endl; // data:[1.2 3.4 5.6]
}
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"... why ?
publisherSock.send(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
订阅者
zmqpp::socket subscriberSock;
/* connection setup stuff ...*/
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
int64_t header_2;
std::vector<double> data;
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"
{
// header 1
{
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
cout << "header_1:" << header_1 << endl;
}
// header 2
{
msgpack::unpacked unpackedData;
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(1)),
msg.size(1));
unpackedData.get().convert(&header_2);
cout << "header_2:" << header_2 << endl;
}
// data
{
msgpack::unpacked unpacked_data;
msgpack::unpack(unpacked_data,
static_cast<const char*>(msg.raw_data(2)),
msg.size(2));
unpacked_data.get().convert(&data);
std::cout << "data:" << data << std::endl;
}
}
编辑: 问题已解决:正如@Jens 所指出的,packing/sending 数据的正确方法是使用 zmqpp::message::add_raw()
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());
我认为对 msg.add(buffer.data(), buffer.size()
的调用不会添加 buffer.size()
字节的数组,而是调用 message::add(Type const& part, Args &&...args)
,
msg << buffer.data()
,这可能会调用message::operator<<(bool)
,因为指针转换为 booladd(buffer.size())
然后调用msg << buffer.size()
,它添加一个size_t
值作为下一部分。
查看 zmqpp::message class,使用 message::add_raw 应该可以解决问题。
PS: 这一切都没有任何保证,因为我从未使用过zmqpp或msgpack。