使用流处理接收到的数据

Using stream to treat received data

我正在从套接字接收消息。 套接字包含在 header(基本上是消息的大小)和作为 crc 的页脚(一种检查消息是否未损坏的代码)

因此,布局类似于:

size (2 bytes) | message (240 bytes) | crc (4 byte)

我写了一个operator>>

operator>>如下:

std::istream &operator>>(std::istream &stream, Message &msg) {
    std::int16_t size;
    stream >> size;
    stream.read(reinterpret_cast<char*>(&msg), size);

    // Not receive enough data
    if (stream.rdbuf()->in_avail() < dataSize + 4) {
        stream.setstate(std::ios_base::failbit);
        return stream;
    }

    std::int16_t gotCrc;
    stream >> gotCrc;

    // Data not received correctly
    if(gotCrc != computeCrc(msg)) {
        stream.setstate(std::ios_base::failbit);
    }
    return stream;
}

消息可以逐字节到达,也可以全部到达。我们甚至可以一次收到多条消息。

基本上,我所做的是这样的:

struct MessageReceiver {
    std::string totalDataReceived;
    void messageArrived(std::string data) {
        // We add the data to totaldataReceived
        totalDataReceived += data;

        std::stringbuf buf(totalDataReceived);
        std::istream stream(&buf);
        std::vector<Message> messages(
            std::istream_iterator<Message>(stream), 
            std::istream_iterator<Message>{});
        std::for_each(begin(messages), end(messages), processMessage);
        // +4 for crc and + 2 for the size to remove
        auto sizeToRemove = [](auto init, auto message) {return init + message.size + 4 + 2;};
        // remove the proceed messages
        totalDataReceived.remove(accumulate(begin(messages), end(messages), 0, sizeToRemove);
    }
};

所以基本上,我们接收数据,我们将它插入到接收到的数据的总数组中。我们流式传输它,如果我们至少收到一条消息,我们将其从缓冲区 totalDataReceived.

中删除

但是,我不确定这是个好方法。实际上,当计算错误的 crc 时,此代码不起作用......(未创建消息,因此我们不对其进行迭代)。所以每次,我都会尝试阅读带有错误 crc 的消息...

我该怎么做?我无法将所有数据保留在 totalDataReceived 中,因为在执行生命周期内我会收到很多消息。

我应该实现自己的 streambuf 吗?

我发现你想要创建的是一个 class,它的作用类似于 std::istream。当然你可以选择创建自己的class,但出于某些原因我更喜欢实现std::streambuf。

首先,使用您的 class 的人习惯使用它,因为如果您继承并实现 std::streambuf 和 std::istream,它的作用与 std::istream 相同。

其次,您不需要创建额外的方法或不需要重写运算符。他们已经准备好达到 std::istream 的 class 水平。

要实现 std::streambuf,您需要做的是继承它,覆盖 underflow() 并使用 setg() 设置获取指针。