不断增长的缓冲区的累积压缩(c++、zlib)

Cumulative compression of a growing buffer (c++, zlib)

我有一个随时间增长的缓冲区(字符串),我需要通过输入大小有限(4096 字节)的通道发送此缓冲区。通过此通道的通信成本很高,这就是发送压缩数据更好的原因。缓冲区的增长通过不同大小的块发生。这些块不能拆分或失去意义。

我实际上在 C++ 中使用 zlib 进行压缩,具有任意缓冲区大小限制。当达到此限制时,字符串将被压缩并通过通道发送。这可行,但不是最佳选择,因为对于不丢失信息而言,限制相当低(通道输入限制为 4096 字节)。

我的想法是使用 zlib 构建一个具有不同大小压缩块的不断增长的压缩缓冲区,并在达到通道输入限制之前停止该过程。 zlib 是否允许使用不同大小的压缩块,或者我需要另一种算法?

最简单的解决方案是将 out-of-band 数据包描述转换为 in-band 格式。到目前为止,最简单的方法是当您的输入块不使用所有 256 个可能的字节时。例如。当值 00 不出现在块中时,它可用于在压缩之前分隔块。否则,您将需要一个转义码。

无论哪种方式,您都可以使用块分隔符压缩连续流。在接收方解压缩流,识别分隔符,并重新组合块。

您可以简单地进行连续 zlib 压缩,每次生成 4K 压缩数据时在您的频道上发送数据。另一方面,您需要确保解压缩器以正确的顺序提供 4K 压缩数据块。

zlib 中的 deflate 算法是突发的,在发出任何压缩数据之前,在内部累积大约 16K 到 64K 或更多的数据,然后交付一个压缩数据块,然后再次累积。因此,除非您请求缩小刷新数据,否则会有延迟。如果你想减少延迟,你可以通过刷新来获得更小的块,对压缩有一些小的影响。

我成功设计了一个压缩器,它通过输入大小有限的通道逐个发送不断增长的缓冲区。我在这里为处理相同问题的任何人提供答案。感谢马克阿德勒和 MSalters 引导我走上正确的道路。

class zStreamManager {
    public:
        zStreamManager();
        ~zStreamManager();
        void endStream();
        void addToStream(const void *inData, size_t inDataSize);

    private:
        // Size of base64 encoded is about 4*originalSize/3 + (3 to 6)
        // so with maximum output size of 4096, 3050 max zipped out
        // buffer will be fine 
        const size_t CHUNK_IN = 1024, CHUNK_OUT = 3050; 
        const std::string base64Chars = 
         "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
         "abcdefghijklmnopqrstuvwxyz"
         "0123456789+/";
        bool deallocated = true;
        z_stream stream;
        std::vector<uint8_t> outBuffer;
        std::string base64Encode(std::vector<uint8_t> &str);
};

zStreamManager::~zStreamManager() {
    endStream();
}

void zStreamManager::endStream() {
    if(!deallocated) {
        deallocated = true; 
        uint8_t tempBuffer[CHUNK_IN];
        int response = Z_OK;
        unsigned int have;

        while(response == Z_OK) {
            if (stream.avail_out == 0) {
                outBuffer.insert(outBuffer.end(), tempBuffer, tempBuffer + CHUNK_IN);
                stream.next_out = tempBuffer;
                stream.avail_out = CHUNK_IN;
            }
            response = deflate(&stream, Z_FINISH);
        }

        have = CHUNK_IN - stream.avail_out;
        if(have)
            outBuffer.insert(outBuffer.end(), tempBuffer, tempBuffer + have);

        deflateEnd(&stream);

        if(outBuffer.size())
            SEND << outBuffer << "$";
    }
}

void zStreamManager::addToStream(const void *inData, size_t inDataSize) {
    if(deallocated) {
        deallocated = false;
        stream.zalloc = 0;
        stream.zfree = 0;
        stream.opaque = 0;
        deflateInit(&stream, 9);
    }

    std::vector<uint8_t> tempBuffer(inDataSize);
    unsigned int have;

    stream.next_in = reinterpret_cast<uint8_t *>(const_cast<void*>(inData));
    stream.avail_in = inDataSize;
    stream.next_out = &tempBuffer[0];
    stream.avail_out = inDataSize;

    while (stream.avail_in != 0) {
        deflate(&stream, Z_SYNC_FLUSH);
        if (stream.avail_out == 0) {
            outBuffer.insert(outBuffer.end(), tempBuffer.begin(), tempBuffer.begin() + inDataSize);
            stream.next_out = &tempBuffer[0];
            stream.avail_out = inDataSize;
        }
    }

    have = inDataSize - stream.avail_out;
    if(have)
        outBuffer.insert(outBuffer.end(), tempBuffer.begin(), tempBuffer.begin() + have);

    while(outBuffer.size() >= CHUNK_OUT) {
        std::vector<uint8_t> zipped;

        zipped.insert(zipped.end(), outBuffer.begin(), outBuffer.begin() + CHUNK_OUT);
        outBuffer.erase(outBuffer.begin(), outBuffer.begin() + CHUNK_OUT);

        if(zipped.size())
           SEND << zipped << "|";
    }
}

std::string zStreamManager::base64Encode(std::vector<uint8_t> &str) {
    /* ALTERED VERSION OF René Nyffenegger BASE64 CODE
   Copyright (C) 2004-2008 René Nyffenegger

   This source code is provided 'as-is', without any express or implied
   warranty. In no event will the author be held liable for any damages
   arising from the use of this software.

   Permission is granted to anyone to use this software for any purpose,
   including commercial applications, and to alter it and redistribute it
   freely, subject to the following restrictions:

   1. The origin of this source code must not be misrepresented; you must not
      claim that you wrote the original source code. If you use this source code
      in a product, an acknowledgment in the product documentation would be
      appreciated but is not required.

   2. Altered source versions must be plainly marked as such, and must not be
      misrepresented as being the original source code.

   3. This notice may not be removed or altered from any source distribution.

   René Nyffenegger rene.nyffenegger@adp-gmbh.ch
    */
  unsigned char const* bytes_to_encode = &str[0];
  unsigned int in_len = str.size();
  std::string ret;
  int i = 0, j = 0;
  unsigned char char_array_3[3], char_array_4[4];

  while(in_len--) {
    char_array_3[i++] = *(bytes_to_encode++);
    if (i == 3) {
      char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
      char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
      char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
      char_array_4[3] = char_array_3[2] & 0x3f;

      for(i = 0; (i <4) ; i++)
        ret += base64Chars[char_array_4[i]];
      i = 0;
    }
  }

  if(i) {
    for(j = i; j < 3; j++)
      char_array_3[j] = '[=10=]';

    char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
    char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
    char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
    char_array_4[3] = char_array_3[2] & 0x3f;

    for(j = 0; (j < i + 1); j++)
      ret += base64Chars[char_array_4[j]];

    while((i++ < 3))
      ret += '=';
  }

  return ret;
}

一个用例:

zStreamManager zm;
string growingBuffer = "";
bool somethingToSend = true;

while(somethingToSend) {
  RECEIVE(&growingBuffer);
  if(growingBuffer.size()) {
    zm.addToStream(growingBuffer.c_str(), growingBuffer.size());
    growingBuffer.clear();
  } else {
    somethingToSend = false;
  }
}

zm.endStream();

RECEIVESEND一起,用于接收缓冲区并通过通道发送缓冲区的方法。对于解压缩,每个部分由“|”分隔字符,整个缓冲区的末尾用“$”分隔。每个部分都必须经过 base64 解码,然后连接起来。最后它可以像任何其他压缩数据一样用 zlib 解压缩。