估计压缩整数列表的最大有效负载大小

Estimating max payload size on a compressed list of integers

我在一个应用程序中有 100 万行。它向服务器发出如下请求:

/search?q=hello

并且搜索 return 是一个排序的整数列表,表示在输入数据集中(用户在浏览器中有)匹配的行。我将如何估计有效负载的最大大小 return?例如,开始我们有:

# ~7 MB if we stored "all results" uncompressed
6888887

# ~ 3.5MB if we stored "all results" relative to 0 or ALL matches (cuts it down by two)
3444443

然后我们想使用某种解压缩(Elias-Fano?)来压缩这些整数。对于 1M 排序整数的大小,"worst-case" 场景是什么?以及如何进行计算?

应用程序有一百万行数据,所以假设 R1 --> R1000000,或者如果是零索引,range(int(1e6))。服务器将响应类似:[1,2,3],表示(仅)第 1、2 和 3 行匹配。

2^(10^6) 个不同的排序(无重复)整数列表 < 10^6。将每个这样的列表映射到相应的列表,比如 [0, 4, ...] 位数组(比如 10001....)产生 10^6 位,即 125kB 的信息。由于每个位数组对应于一个唯一的可能排序列表,反之亦然,这是最 紧凑(在某种意义上:具有最小的最大尺寸)表示。

当然,如果某些结果比其他结果更有可能,则可能 效率更高(在某种意义上:具有更小的 平均值 尺寸)表示。 例如,如果大多数结果集都很小,则简单的 运行 长度编码通常会产生较小的编码。

不可避免地,在那种情况下,编码的最大大小(您询问的最大有效负载大小)将超过 125 kB

压缩上述125 kB位数组,例如zlib will yield an acceptably compact encoding for small result sets. Moreover, zlib has a function deflateBound() 给定未压缩的大小,将计算最大负载大小(在您的情况下,它肯定会大于 125 kB,但不会太大)

输入规范:

  • 0到999999之间的行号(如果需要1-indexing,可以应用offset)
  • 每个行号只出现一次
  • 数字按升序排列(很有用,无论如何我们都想对它们进行排序)

您的一个好主意是当匹配数超过可能值的一半时反转结果的含义。让我们保留它,并假设我们有一个标志和一个 matches/misses.

的列表

您最初尝试编码时将数字编码为逗号分隔的文本。这意味着对于 90% 的可能值,您需要 6 个字符 + 1 个分隔符——因此平均需要 7 个字节。但是,由于最大值为 999999,因此您实际上只需要 20 位来对每个条目进行编码。

因此,减小尺寸的第一个想法是使用二进制编码。


二进制编码

最简单的方法是写入发送的值的数量,然后是 32 位整数流。

一种更有效的方法是将两个 20 位值打包到写入的每 5 个字节中。如果是奇数,您只需用零填充 4 个多余的位。

这些方法可能适用于少量匹配(或未命中)。然而,需要注意的重要一点是,对于每一行,我们只需要跟踪 1 位信息——无论它是否存在。这意味着我们可以将结果编码为 1000000 位的位图。

结合这两种方法,我们可以在有很多匹配或未命中时使用位图,并在效率更高时切换到二进制编码。


范围缩小

编码排序的整数序列时使用的下一个潜在改进是使用范围缩减。

想法是按从大到小的顺序对值进行编码,并随着值变小而减少每个值的位数。

  • 首先,我们对表示第一个值所需的位数 N 进行编码。
  • 我们使用 N
  • 对第一个值进行编码
  • 对于以下每个值
    • 使用 N 位对值进行编码
    • 如果值需要较少的位来编码,适当减少N

熵编码

让我们回到位图编码。基于 Shannon entropy theory 最坏的情况是我们有 50% 的匹配。概率偏差越大,我们平均需要对每个条目进行编码的位数就越少。

Matches | Bits
--------+-----------
0       | 0
1       | 22
2       | 41
3       | 60
4       | 78
5       | 96
10      | 181
100     | 1474
1000    | 11408
10000   | 80794
100000  | 468996
250000  | 811279
500000  | 1000000 

为此,我们需要使用可以对小数位进行编码的熵编码器——类似于算术或范围编码器或一些新的基于 ANS 的编码器,如 FSE。或者,我们可以将符号组合在一起并使用霍夫曼编码。


原型和测量

我使用 Amir Said 的 FastAC 的 32 位实现编写了一个测试,它将模型限制为小数点后 4 位。 (这不是真正的问题,因为我们不应该将此类数据直接提供给编解码器。这只是一个演示。)

首先是一些常用代码:

typedef std::vector<uint8_t> match_symbols_t;
typedef std::vector<uint32_t> match_list_t;
typedef std::set<uint32_t> match_set_t;
typedef std::vector<uint8_t> buffer_t;
// ----------------------------------------------------------------------------
static uint32_t const NUM_VALUES(1000000);
// ============================================================================
size_t symbol_count(uint8_t bits)
{
    size_t count(NUM_VALUES / bits);
    if (NUM_VALUES % bits > 0) {
        return count + 1;
    }
    return count;
}
// ----------------------------------------------------------------------------
void set_symbol(match_symbols_t& symbols, uint8_t bits, uint32_t match, bool state)
{
    size_t index(match / bits);
    size_t offset(match % bits);
    if (state) {
        symbols[index] |= 1 << offset;
    } else {
        symbols[index] &= ~(1 << offset);
    }
}
// ----------------------------------------------------------------------------
bool get_symbol(match_symbols_t const& symbols, uint8_t bits, uint32_t match)
{
    size_t index(match / bits);
    size_t offset(match % bits);
    return (symbols[index] & (1 << offset)) != 0;
}
// ----------------------------------------------------------------------------
match_symbols_t make_symbols(match_list_t const& matches, uint8_t bits)
{
    assert((bits > 0) && (bits <= 8));

    match_symbols_t symbols(symbol_count(bits), 0);
    for (auto match : matches) {
        set_symbol(symbols, bits, match, true);
    }

    return symbols;
}
// ----------------------------------------------------------------------------
match_list_t make_matches(match_symbols_t const& symbols, uint8_t bits)
{
    match_list_t result;
    for (uint32_t i(0); i < 1000000; ++i) {
        if (get_symbol(symbols, bits, i)) {
            result.push_back(i);
        }
    }
    return result;
}

首先,更简单的变体是写入匹配数,确定 match/miss 的概率并将其限制在支持的范围内。 然后使用这个静态概率模型简单地编码位图的每个值。

class arithmetic_codec_v1
{
public:
    buffer_t compress(match_list_t const& matches)
    {
        uint32_t match_count(static_cast<uint32_t>(matches.size()));

        arithmetic_codec codec(static_cast<uint32_t>(NUM_VALUES / 4));
        codec.start_encoder();

        // Store the number of matches (1000000 needs only 20 bits)
        codec.put_bits(match_count, 20);

        if (match_count > 0) {
            // Initialize the model
            static_bit_model model;
            model.set_probability_0(get_probability_0(match_count));

            // Create a bitmap and code all the bitmap entries
            // NB: This is lazy and inefficient, but simple
            match_symbols_t symbols = make_symbols(matches, 1);
            for (auto entry : symbols) {
                codec.encode(entry, model);
            }
        }

        uint32_t compressed_size = codec.stop_encoder();
        return buffer_t(codec.buffer(), codec.buffer() + compressed_size);
    }

    match_list_t decompress(buffer_t& compressed)
    {
        arithmetic_codec codec(static_cast<uint32_t>(compressed.size()), &compressed[0]);
        codec.start_decoder();

        // Read number of matches (20 bits)
        uint32_t match_count(codec.get_bits(20));

        match_list_t result;
        if (match_count > 0) {
            static_bit_model model;
            model.set_probability_0(get_probability_0(match_count));

            result.reserve(match_count);
            for (uint32_t i(0); i < NUM_VALUES; ++i) {
                uint32_t entry = codec.decode(model);
                if (entry == 1) {
                    result.push_back(i);
                }
            }
        }

        codec.stop_decoder();
        return result;
    }

private:
    double get_probability_0(uint32_t match_count, uint32_t num_values = NUM_VALUES)
    {
        double probability_0(double(num_values - match_count) / num_values);
        // Limit probability to match FastAC limitations...
        return std::max(0.0001, std::min(0.9999, probability_0));
    }
};

第二种方法是根据我们编码的符号调整模型。 每次匹配编码后,降低下一次匹配的概率。 一旦我们编码了所有匹配项,就停止。

第二个变体压缩效果稍好,但性能成本显着。

class arithmetic_codec_v2
{
public:
    buffer_t compress(match_list_t const& matches)
    {
        uint32_t match_count(static_cast<uint32_t>(matches.size()));
        uint32_t total_count(NUM_VALUES);

        arithmetic_codec codec(static_cast<uint32_t>(NUM_VALUES / 4));
        codec.start_encoder();

        // Store the number of matches (1000000 needs only 20 bits)
        codec.put_bits(match_count, 20);

        if (match_count > 0) {
            static_bit_model model;

            // Create a bitmap and code all the bitmap entries
            // NB: This is lazy and inefficient, but simple
            match_symbols_t symbols = make_symbols(matches, 1);
            for (auto entry : symbols) {
                model.set_probability_0(get_probability_0(match_count, total_count));
                codec.encode(entry, model);
                --total_count;
                if (entry) {
                    --match_count;
                }
                if (match_count == 0) {
                    break;
                }
            }
        }

        uint32_t compressed_size = codec.stop_encoder();
        return buffer_t(codec.buffer(), codec.buffer() + compressed_size);
    }

    match_list_t decompress(buffer_t& compressed)
    {
        arithmetic_codec codec(static_cast<uint32_t>(compressed.size()), &compressed[0]);
        codec.start_decoder();

        // Read number of matches (20 bits)
        uint32_t match_count(codec.get_bits(20));
        uint32_t total_count(NUM_VALUES);

        match_list_t result;
        if (match_count > 0) {
            static_bit_model model;
            result.reserve(match_count);
            for (uint32_t i(0); i < NUM_VALUES; ++i) {
                model.set_probability_0(get_probability_0(match_count, NUM_VALUES - i));
                if (codec.decode(model) == 1) {
                    result.push_back(i);
                    --match_count;
                }
                if (match_count == 0) {
                    break;
                }
            }
        }

        codec.stop_decoder();
        return result;
    }

private:
    double get_probability_0(uint32_t match_count, uint32_t num_values = NUM_VALUES)
    {
        double probability_0(double(num_values - match_count) / num_values);
        // Limit probability to match FastAC limitations...
        return std::max(0.0001, std::min(0.9999, probability_0));
    }
};


实用方法

实际上,设计一种新的压缩格式可能不值得。 事实上,将结果写为位可能甚至不值得,只需创建一个值为 0 或 1 的字节数组即可。 然后使用现有的压缩库 -- zlib 很常见,或者您可以尝试 lz4 或 snappy、bzip2、lzma...选择很多。

ZLib 示例

class zlib_codec
{
public:
    zlib_codec(uint32_t bits_per_symbol) : bits_per_symbol(bits_per_symbol) {}

    buffer_t compress(match_list_t const& matches)
    {
        match_symbols_t symbols(make_symbols(matches, bits_per_symbol));

        z_stream defstream;
        defstream.zalloc = nullptr;
        defstream.zfree = nullptr;
        defstream.opaque = nullptr;

        deflateInit(&defstream, Z_BEST_COMPRESSION);
        size_t max_compress_size = deflateBound(&defstream, static_cast<uLong>(symbols.size()));

        buffer_t compressed(max_compress_size);

        defstream.avail_in = static_cast<uInt>(symbols.size());
        defstream.next_in = &symbols[0];
        defstream.avail_out = static_cast<uInt>(max_compress_size);
        defstream.next_out = &compressed[0];

        deflate(&defstream, Z_FINISH);
        deflateEnd(&defstream);

        compressed.resize(defstream.total_out);
        return compressed;
    }

    match_list_t decompress(buffer_t& compressed)
    {
        z_stream infstream;
        infstream.zalloc = nullptr;
        infstream.zfree = nullptr;
        infstream.opaque = nullptr;

        inflateInit(&infstream);

        match_symbols_t symbols(symbol_count(bits_per_symbol));

        infstream.avail_in = static_cast<uInt>(compressed.size());
        infstream.next_in = &compressed[0];
        infstream.avail_out = static_cast<uInt>(symbols.size());
        infstream.next_out = &symbols[0];

        inflate(&infstream, Z_FINISH);
        inflateEnd(&infstream);

        return make_matches(symbols, bits_per_symbol);
    }
private:
    uint32_t bits_per_symbol;
};

BZip2 示例

class bzip2_codec
{
public:
    bzip2_codec(uint32_t bits_per_symbol) : bits_per_symbol(bits_per_symbol) {}

    buffer_t compress(match_list_t const& matches)
    {
        match_symbols_t symbols(make_symbols(matches, bits_per_symbol));

        uint32_t compressed_size = symbols.size() * 2;
        buffer_t compressed(compressed_size);

        int err = BZ2_bzBuffToBuffCompress((char*)&compressed[0]
            , &compressed_size
            , (char*)&symbols[0]
            , symbols.size()
            , 9
            , 0
            , 30);
        if (err != BZ_OK) {
            throw std::runtime_error("Compression error.");
        }

        compressed.resize(compressed_size);
        return compressed;
    }

    match_list_t decompress(buffer_t& compressed)
    {
        match_symbols_t symbols(symbol_count(bits_per_symbol));

        uint32_t decompressed_size = symbols.size();
        int err = BZ2_bzBuffToBuffDecompress((char*)&symbols[0]
            , &decompressed_size
            , (char*)&compressed[0]
            , compressed.size()
            , 0
            , 0);
        if (err != BZ_OK) {
            throw std::runtime_error("Compression error.");
        }
        if (decompressed_size != symbols.size()) {
            throw std::runtime_error("Size mismatch.");
        }

        return make_matches(symbols, bits_per_symbol);
    }
private:
    uint32_t bits_per_symbol;
};


比较


代码存储库,包括 64 位的依赖项 Visual Studio 2015 年在 https://github.com/dan-masek/bounded_sorted_list_compression.git

存储排序整数的压缩列表在数据检索和数据库应用程序中极为常见,并且已经开发出多种技术。

我敢肯定,在你的列表中随机选择大约一半的项目将是你最糟糕的情况。

许多流行的 integer-list-compression 技术,例如 Roaring 位图,回退到使用(具有这样的 worst-case 输入数据)1-bit-per-index 位图。

因此,在您的情况下,如果有 100 万行,返回的最大负载大小将是(在最坏的情况下)设置了 "using a bitmap" 标志的 header, 后跟 100 万位(125,000 字节)的位图,例如,如果数据库中的第 700 行匹配,则位图的第 700 位设置为 1,如果数据库中的第 700 行不匹配,则位图的第 700 位设置为 0匹配。 (谢谢 Dan Mašek!)

我的理解是,虽然 quasi-succinct Elias-Fano 压缩和其他技术对于压缩许多 "naturally-occurring" 组排序整数非常有用,但对于这个 worst-case 数据集, none 其中的压缩比简单的位图更好,而大多数 "compression" 比简单的位图差得多。

(这类似于大多数 general-purpose 数据压缩算法的方式,例如 DEFLATE,当输入 "worst-case" 数据(例如 indistinguishable-from-random 加密数据时,创建 "compressed"带有几个字节开销的文件,设置了 "stored/raw/literal" 标志,后跟未压缩文件的简单副本。