使用流处理接收到的数据
Using stream to treat received data
我正在从套接字接收消息。
套接字包含在 header(基本上是消息的大小)和作为 crc 的页脚(一种检查消息是否未损坏的代码)
因此,布局类似于:
size (2 bytes) | message (240 bytes) | crc (4 byte)
我写了一个operator>>
operator>>
如下:
std::istream &operator>>(std::istream &stream, Message &msg) {
std::int16_t size;
stream >> size;
stream.read(reinterpret_cast<char*>(&msg), size);
// Not receive enough data
if (stream.rdbuf()->in_avail() < dataSize + 4) {
stream.setstate(std::ios_base::failbit);
return stream;
}
std::int16_t gotCrc;
stream >> gotCrc;
// Data not received correctly
if(gotCrc != computeCrc(msg)) {
stream.setstate(std::ios_base::failbit);
}
return stream;
}
消息可以逐字节到达,也可以全部到达。我们甚至可以一次收到多条消息。
基本上,我所做的是这样的:
struct MessageReceiver {
std::string totalDataReceived;
void messageArrived(std::string data) {
// We add the data to totaldataReceived
totalDataReceived += data;
std::stringbuf buf(totalDataReceived);
std::istream stream(&buf);
std::vector<Message> messages(
std::istream_iterator<Message>(stream),
std::istream_iterator<Message>{});
std::for_each(begin(messages), end(messages), processMessage);
// +4 for crc and + 2 for the size to remove
auto sizeToRemove = [](auto init, auto message) {return init + message.size + 4 + 2;};
// remove the proceed messages
totalDataReceived.remove(accumulate(begin(messages), end(messages), 0, sizeToRemove);
}
};
所以基本上,我们接收数据,我们将它插入到接收到的数据的总数组中。我们流式传输它,如果我们至少收到一条消息,我们将其从缓冲区 totalDataReceived
.
中删除
但是,我不确定这是个好方法。实际上,当计算错误的 crc 时,此代码不起作用......(未创建消息,因此我们不对其进行迭代)。所以每次,我都会尝试阅读带有错误 crc 的消息...
我该怎么做?我无法将所有数据保留在 totalDataReceived
中,因为在执行生命周期内我会收到很多消息。
我应该实现自己的 streambuf 吗?
我发现你想要创建的是一个 class,它的作用类似于 std::istream。当然你可以选择创建自己的class,但出于某些原因我更喜欢实现std::streambuf。
首先,使用您的 class 的人习惯使用它,因为如果您继承并实现 std::streambuf 和 std::istream,它的作用与 std::istream 相同。
其次,您不需要创建额外的方法或不需要重写运算符。他们已经准备好达到 std::istream 的 class 水平。
要实现 std::streambuf,您需要做的是继承它,覆盖 underflow() 并使用 setg() 设置获取指针。
我正在从套接字接收消息。 套接字包含在 header(基本上是消息的大小)和作为 crc 的页脚(一种检查消息是否未损坏的代码)
因此,布局类似于:
size (2 bytes) | message (240 bytes) | crc (4 byte)
我写了一个operator>>
operator>>
如下:
std::istream &operator>>(std::istream &stream, Message &msg) {
std::int16_t size;
stream >> size;
stream.read(reinterpret_cast<char*>(&msg), size);
// Not receive enough data
if (stream.rdbuf()->in_avail() < dataSize + 4) {
stream.setstate(std::ios_base::failbit);
return stream;
}
std::int16_t gotCrc;
stream >> gotCrc;
// Data not received correctly
if(gotCrc != computeCrc(msg)) {
stream.setstate(std::ios_base::failbit);
}
return stream;
}
消息可以逐字节到达,也可以全部到达。我们甚至可以一次收到多条消息。
基本上,我所做的是这样的:
struct MessageReceiver {
std::string totalDataReceived;
void messageArrived(std::string data) {
// We add the data to totaldataReceived
totalDataReceived += data;
std::stringbuf buf(totalDataReceived);
std::istream stream(&buf);
std::vector<Message> messages(
std::istream_iterator<Message>(stream),
std::istream_iterator<Message>{});
std::for_each(begin(messages), end(messages), processMessage);
// +4 for crc and + 2 for the size to remove
auto sizeToRemove = [](auto init, auto message) {return init + message.size + 4 + 2;};
// remove the proceed messages
totalDataReceived.remove(accumulate(begin(messages), end(messages), 0, sizeToRemove);
}
};
所以基本上,我们接收数据,我们将它插入到接收到的数据的总数组中。我们流式传输它,如果我们至少收到一条消息,我们将其从缓冲区 totalDataReceived
.
但是,我不确定这是个好方法。实际上,当计算错误的 crc 时,此代码不起作用......(未创建消息,因此我们不对其进行迭代)。所以每次,我都会尝试阅读带有错误 crc 的消息...
我该怎么做?我无法将所有数据保留在 totalDataReceived
中,因为在执行生命周期内我会收到很多消息。
我应该实现自己的 streambuf 吗?
我发现你想要创建的是一个 class,它的作用类似于 std::istream。当然你可以选择创建自己的class,但出于某些原因我更喜欢实现std::streambuf。
首先,使用您的 class 的人习惯使用它,因为如果您继承并实现 std::streambuf 和 std::istream,它的作用与 std::istream 相同。
其次,您不需要创建额外的方法或不需要重写运算符。他们已经准备好达到 std::istream 的 class 水平。
要实现 std::streambuf,您需要做的是继承它,覆盖 underflow() 并使用 setg() 设置获取指针。