从固定大小的字节缓冲区的连续块中解析 protobuf 消息序列

Parse sequences of protobuf messages from continguous chunks of fixed sized byte buffer

由于我对 C++ 的了解不足,我已经为此苦苦挣扎了两天。我需要做的是使用 protobuf C++ API 从一个大文件中解析消息序列,该文件可能包含数百万条此类消息。直接从文件中读取很容易,因为我总是可以只执行 "ReadVarInt32" 来获取大小,然后执行 ParseFromCodedStream 并在 CodedInputStream 上推送限制,如 this post 中所述。但是,我正在使用的 I/O 级别 API(实际上是 libuv)需要为每个读取回调操作分配固定大小的缓冲区。显然,块大小与我正在读取的消息大小无关。

这让我的生活很艰难。基本上每次我从文件中读取并填充固定大小的缓冲区(比如 16K)时,该缓冲区可能包含数百条完整的 protobuf 消息,但该缓冲区的最后一块可能是一条不完整的消息。所以我想,好吧,我应该做的是尝试读取尽可能多的消息,最后,提取最后一个块并将其附加到我读出的下一个 16K 缓冲区的开头,继续直到我到达 EOF文件。我使用 ReadVarInt32() 获取大小,然后将该数字与缓冲区大小的其余部分进行比较,如果消息大小较小,则继续阅读。

有一个叫GetDirectBufferPointer的API,所以我试图用这个来记录指针位置之前我什至读出了下一条消息的尺寸。但是我怀疑由于字节序的原因,如果我只是从指针开始的地方提取字节数组的其余部分并附加到下一个块,Parse 将不会成功,实际上前几个字节(我认为是 8 个)完全搞砸了.

或者,如果我执行 codedStream.ReadRaw() 并将剩余流写入缓冲区,然后附加到新块的头部,数据将不会损坏。但问题是这次我将丢失 "size" 字节信息,因为它已经在 "ReadVarInt32" 中 "read"!而且就算我直接记住上次读到的size信息,下次迭代直接调用message.ParseFromCodedStream(),结果还是少读了一个字节,甚至有部分损坏,无法成功恢复对象.

std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
char bResidueBuffer[READ_BUFFER_SIZE];
char temp[READ_BUFFER_SIZE];
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
    mCheckBuffer.clear();
    //merge the last remaining chunk that contains incomplete message with
    //the new data chunk I got out from buffer. Excuse my terrible C++ foo
    std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
    mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));

    //Treat the new merged buffer array as the new CIS
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size());
    google::protobuf::io::CodedInputStream cis(&ais);
    //Record the pointer location on CIS in bResidueBuffer
    cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
    &bResidueBufSize);

    //No size information, probably first time or last iteration  
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message
    if(size == 0) {
         cis.ReadVarint32(&size);
    }
    //Have to read this again to get remaining buffer size
    cis.GetDirectBufferPointer((const void**)&temp, &mResidueBufSize);

    //Compare the next message size with how much left in the buffer, if      
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer, or, it's the end of message 
    //and my buffer just allocated larger so size should be 0
    while (size <= mResidueBufSize && size != 0) {
        //If this cis I constructed didn't have the size info at the beginning, 
        //and I just read straight from it hoping to get the message out from 
        //the "size" I got from last iteration, it simply doesn't work
        //(read one less byte in fact, and some part of the message corrupted)
        //push the size constraint to the input stream;
        int limit = cis.PushLimit(size);
        //parse message from the input stream
        message.ParseFromCodedStream(&cis);  
        cis.PopLimit(limit);
        google::protobuf::TextFormat::PrintToString(message, &str);
        printf("%s", str.c_str());
        //do something with the parsed object
        //Now I have to record the new pointer location again
        cis.GetDirectBufferPointer((const void**)&bResidueBuffer, 
        &bResidueBufSize);
        //Read another time the next message's size and go back to while loop check
        cis.ReadVarint32(&size);

    }
    //If I do the next line, bResidueBuffer will have the correct CIS information 
    //copied over, but not having the "already read" size info
    cis.ReadRaw(bResidueBuffer, bResidueBufSize);
    mResidueBuffer.clear();
    //I am constructing a new vector that receives the residual chunk of the 
    //current buffer that isn't enough to restore a message
    //If I don't do ReadRaw, this copy completely messes up at least the first 8 
    //bytes of the copied buffer's value, due to I suspect endianness
    mResidueBuffer.insert(mResidueBuffer.end(), &bResidueBuffer[0], 
    &bResidueBuffer[bResidueBufSize]);
}

我现在真的是一头雾水。甚至可以优雅地将 protobuf 与 APIs 一起使用,它根本需要固定大小的中间缓冲区?非常感谢任何输入,谢谢!

我发现您的代码存在两个主要问题:

std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));

看起来您希望 std::merge 连接您的缓冲区,但实际上此函数执行合并排序意义上的两个排序数组合并为单个排序数组。这在这种情况下没有任何意义; mCheckBuffer 最终将包含废话。

cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);

此处您将 ​​&bResidueBuffer 转换为不兼容的指针类型。 bResidueBuffer是char数组,所以&bResidueBuffer是指向char数组的指针,也就是不是指向指针的指针。这确实令人困惑,因为数组可以隐式转换为指针(指针指向数组的第一个元素),但这实际上是一种转换——bResidueBuffer 本身是 而不是 一个指针,它可以转换为一个。

我认为您也误解了 GetDirectBufferPointer() 的作用。看起来您希望它将缓冲区的其余部分复制到 bResidueBuffer,但该方法从不复制任何数据。该方法返回一个指向原始缓冲区的指针。

正确的调用方式如下:

const void* ptr;
int size;
cis.GetDirectBufferPointer(&ptr, &size);

现在 ptr 将指向原始缓冲区。您现在可以将其与指向缓冲区开头的指针进行比较,以找出您在流中的位置,例如:

size_t pos = (const char*)ptr - &mCheckBuffer[0];

但是,您不应该那样做,因为 CodedInputStream 已经有了用于此目的的方法 CurrentPosition()。这将 return 缓冲区中的当前字节偏移量。所以,改用它。

好的,感谢 Kenton 的帮助指出了我问题中的主要问题,我现在已经修改了代码段并测试了它的工作情况。我将 post 我的解决方案放在这里。然而,话虽如此,我对我需要在这里进行的所有复杂性和边缘情况检查并不满意。我认为这很容易出错。即使这样,我可能真正要做的是在我的 libuv 主线程之外的另一个线程中编写我的直接 "read from stream" 阻塞调用,这样我就不会得到必须使用 libuv API 的要求。但为了完整起见,这是我的代码:

std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
std::vector<char> mReadBuffer(READ_BUFFER_SIZE);
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
    //This part is tricky as you're not guaranteed that what end up in 
    //mReadBuffer is everything you read out from the file. The same 
    //happens with libuv's assigned buffer, after EOF, what's rest in 
    //the buffer could be anything
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
    //merge the last remaining chunk that contains incomplete message with
    //the new data chunk I got out from buffer. I couldn't find a more 
    //efficient way doing that
    mCheckBuffer.clear();
    mCheckBuffer.reserve(mResidueBuffer.size() + mReadBuffer.size());
    mCheckBuffer.insert(mCheckBuffer.end(), mResidueBuffer.begin(),
    mResidueBuffer.end());
    mCheckBuffer.insert(mCheckBuffer.end(), mReadBuffer.begin(),
    mReadBuffer.end());
    //Treat the new merged buffer array as the new CIS
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size());
    google::protobuf::io::CodedInputStream cis(&ais);
    //No size information, probably first time or last iteration  
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message
    if(size == 0) {
        cis.ReadVarint32(&size);
    }
    bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition();
    //Compare the next message size with how much left in the buffer, if      
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer. If, it's the end of message 
    //and size (next byte I read from stream) happens to be 0, that
    //will trip me up, cos when I push size 0 into PushLimit and then try 
    //parsing, it will actually return true even if it reads nothing. 
    //So I can get into an infinite loop, if I don't do the check here
    while (size <= bResidueBufSize && size != 0) {
        //If this cis I constructed didn't have the size info at the 
        //beginning, and I just read straight from it hoping to get the  
        //message out from the "size" I got from last iteration
        //push the size constraint to the input stream
        int limit = cis.PushLimit(size); 
        //parse the message from the input stream
        bool result = message.ParseFromCodedStream(&cis);  
        //Parse fail, it could be because last iteration already took care
        //of the last message and that size I read last time is just junk
        //I choose to only check EOF here when result is not true, (which
        //leads me to having to check for size=0 case above), cos it will
        //be too many checks if I check it everytime I finish reading a 
        //message out
        if(!result) {
            if(in.eof()) {
                log.info("Reached EOF, stop processing!");
                break;
            }
            else {
                log.error("Read error or input mal-formatted! Log error!");
                exit;
            }
        }
        cis.PopLimit(limit);
        google::protobuf::TextFormat::PrintToString(message, &str);
        //Do something with the message

        //This is when the last message read out exactly reach the end of 
        //the buffer and there is no size information available on the 
        //stream any more, in which case size will need to be reset to zero
        //so that the beginning of next iteration will read size info first
        if(!cis.ReadVarint32(&size)) {
            size = 0;
        }
        bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition();
    }
    if(in.eof()) {
        break;
    }
    //Now I am copying the residual buffer into the intermediate
    //mResidueBuffer, which will be merged with newly read data in next iteration
    mResidueBuffer.clear();
    mResidueBuffer.reserve(bResidueBufSize);
    mResidueBuffer.insert(mResidueBuffer.end(), 
    &mCheckBuffer[cis.CurrentPosition()],&mCheckBuffer[mCheckBuffer.size()]);
}
if(!in.eof()) {
    log.error("Something else other than EOF happened to the file, log error!");
    exit;
}