估计压缩整数列表的最大有效负载大小
Estimating max payload size on a compressed list of integers
我在一个应用程序中有 100 万行。它向服务器发出如下请求:
/search?q=hello
并且搜索 return 是一个排序的整数列表,表示在输入数据集中(用户在浏览器中有)匹配的行。我将如何估计有效负载的最大大小 return?例如,开始我们有:
# ~7 MB if we stored "all results" uncompressed
6888887
# ~ 3.5MB if we stored "all results" relative to 0 or ALL matches (cuts it down by two)
3444443
然后我们想使用某种解压缩(Elias-Fano?)来压缩这些整数。对于 1M 排序整数的大小,"worst-case" 场景是什么?以及如何进行计算?
应用程序有一百万行数据,所以假设 R1 --> R1000000,或者如果是零索引,range(int(1e6))
。服务器将响应类似:[1,2,3]
,表示(仅)第 1、2 和 3 行匹配。
有 2^(10^6)
个不同的排序(无重复)整数列表 < 10^6
。将每个这样的列表映射到相应的列表,比如 [0, 4, ...]
位数组(比如 10001....
)产生 10^6
位,即 125kB 的信息。由于每个位数组对应于一个唯一的可能排序列表,反之亦然,这是最
紧凑(在某种意义上:具有最小的最大尺寸)表示。
当然,如果某些结果比其他结果更有可能,则可能 效率更高(在某种意义上:具有更小的 平均值 尺寸)表示。
例如,如果大多数结果集都很小,则简单的 运行 长度编码通常会产生较小的编码。
不可避免地,在那种情况下,编码的最大大小(您询问的最大有效负载大小)将超过 125 kB
压缩上述125 kB位数组,例如zlib will yield an acceptably compact encoding for small result sets. Moreover, zlib has a function deflateBound() 给定未压缩的大小,将计算最大负载大小(在您的情况下,它肯定会大于 125 kB,但不会太大)
输入规范:
- 0到999999之间的行号(如果需要1-indexing,可以应用offset)
- 每个行号只出现一次
- 数字按升序排列(很有用,无论如何我们都想对它们进行排序)
您的一个好主意是当匹配数超过可能值的一半时反转结果的含义。让我们保留它,并假设我们有一个标志和一个 matches/misses.
的列表
您最初尝试编码时将数字编码为逗号分隔的文本。这意味着对于 90% 的可能值,您需要 6 个字符 + 1 个分隔符——因此平均需要 7 个字节。但是,由于最大值为 999999,因此您实际上只需要 20 位来对每个条目进行编码。
因此,减小尺寸的第一个想法是使用二进制编码。
二进制编码
最简单的方法是写入发送的值的数量,然后是 32 位整数流。
一种更有效的方法是将两个 20 位值打包到写入的每 5 个字节中。如果是奇数,您只需用零填充 4 个多余的位。
这些方法可能适用于少量匹配(或未命中)。然而,需要注意的重要一点是,对于每一行,我们只需要跟踪 1 位信息——无论它是否存在。这意味着我们可以将结果编码为 1000000 位的位图。
结合这两种方法,我们可以在有很多匹配或未命中时使用位图,并在效率更高时切换到二进制编码。
范围缩小
编码排序的整数序列时使用的下一个潜在改进是使用范围缩减。
想法是按从大到小的顺序对值进行编码,并随着值变小而减少每个值的位数。
- 首先,我们对表示第一个值所需的位数
N
进行编码。
- 我们使用
N
位 对第一个值进行编码
- 对于以下每个值
- 使用
N
位对值进行编码
- 如果值需要较少的位来编码,适当减少
N
熵编码
让我们回到位图编码。基于 Shannon entropy theory
最坏的情况是我们有 50% 的匹配。概率偏差越大,我们平均需要对每个条目进行编码的位数就越少。
Matches | Bits
--------+-----------
0 | 0
1 | 22
2 | 41
3 | 60
4 | 78
5 | 96
10 | 181
100 | 1474
1000 | 11408
10000 | 80794
100000 | 468996
250000 | 811279
500000 | 1000000
为此,我们需要使用可以对小数位进行编码的熵编码器——类似于算术或范围编码器或一些新的基于 ANS 的编码器,如 FSE。或者,我们可以将符号组合在一起并使用霍夫曼编码。
原型和测量
我使用 Amir Said 的 FastAC 的 32 位实现编写了一个测试,它将模型限制为小数点后 4 位。
(这不是真正的问题,因为我们不应该将此类数据直接提供给编解码器。这只是一个演示。)
首先是一些常用代码:
typedef std::vector<uint8_t> match_symbols_t;
typedef std::vector<uint32_t> match_list_t;
typedef std::set<uint32_t> match_set_t;
typedef std::vector<uint8_t> buffer_t;
// ----------------------------------------------------------------------------
static uint32_t const NUM_VALUES(1000000);
// ============================================================================
size_t symbol_count(uint8_t bits)
{
size_t count(NUM_VALUES / bits);
if (NUM_VALUES % bits > 0) {
return count + 1;
}
return count;
}
// ----------------------------------------------------------------------------
void set_symbol(match_symbols_t& symbols, uint8_t bits, uint32_t match, bool state)
{
size_t index(match / bits);
size_t offset(match % bits);
if (state) {
symbols[index] |= 1 << offset;
} else {
symbols[index] &= ~(1 << offset);
}
}
// ----------------------------------------------------------------------------
bool get_symbol(match_symbols_t const& symbols, uint8_t bits, uint32_t match)
{
size_t index(match / bits);
size_t offset(match % bits);
return (symbols[index] & (1 << offset)) != 0;
}
// ----------------------------------------------------------------------------
match_symbols_t make_symbols(match_list_t const& matches, uint8_t bits)
{
assert((bits > 0) && (bits <= 8));
match_symbols_t symbols(symbol_count(bits), 0);
for (auto match : matches) {
set_symbol(symbols, bits, match, true);
}
return symbols;
}
// ----------------------------------------------------------------------------
match_list_t make_matches(match_symbols_t const& symbols, uint8_t bits)
{
match_list_t result;
for (uint32_t i(0); i < 1000000; ++i) {
if (get_symbol(symbols, bits, i)) {
result.push_back(i);
}
}
return result;
}
首先,更简单的变体是写入匹配数,确定 match/miss 的概率并将其限制在支持的范围内。
然后使用这个静态概率模型简单地编码位图的每个值。
class arithmetic_codec_v1
{
public:
buffer_t compress(match_list_t const& matches)
{
uint32_t match_count(static_cast<uint32_t>(matches.size()));
arithmetic_codec codec(static_cast<uint32_t>(NUM_VALUES / 4));
codec.start_encoder();
// Store the number of matches (1000000 needs only 20 bits)
codec.put_bits(match_count, 20);
if (match_count > 0) {
// Initialize the model
static_bit_model model;
model.set_probability_0(get_probability_0(match_count));
// Create a bitmap and code all the bitmap entries
// NB: This is lazy and inefficient, but simple
match_symbols_t symbols = make_symbols(matches, 1);
for (auto entry : symbols) {
codec.encode(entry, model);
}
}
uint32_t compressed_size = codec.stop_encoder();
return buffer_t(codec.buffer(), codec.buffer() + compressed_size);
}
match_list_t decompress(buffer_t& compressed)
{
arithmetic_codec codec(static_cast<uint32_t>(compressed.size()), &compressed[0]);
codec.start_decoder();
// Read number of matches (20 bits)
uint32_t match_count(codec.get_bits(20));
match_list_t result;
if (match_count > 0) {
static_bit_model model;
model.set_probability_0(get_probability_0(match_count));
result.reserve(match_count);
for (uint32_t i(0); i < NUM_VALUES; ++i) {
uint32_t entry = codec.decode(model);
if (entry == 1) {
result.push_back(i);
}
}
}
codec.stop_decoder();
return result;
}
private:
double get_probability_0(uint32_t match_count, uint32_t num_values = NUM_VALUES)
{
double probability_0(double(num_values - match_count) / num_values);
// Limit probability to match FastAC limitations...
return std::max(0.0001, std::min(0.9999, probability_0));
}
};
第二种方法是根据我们编码的符号调整模型。
每次匹配编码后,降低下一次匹配的概率。
一旦我们编码了所有匹配项,就停止。
第二个变体压缩效果稍好,但性能成本显着。
class arithmetic_codec_v2
{
public:
buffer_t compress(match_list_t const& matches)
{
uint32_t match_count(static_cast<uint32_t>(matches.size()));
uint32_t total_count(NUM_VALUES);
arithmetic_codec codec(static_cast<uint32_t>(NUM_VALUES / 4));
codec.start_encoder();
// Store the number of matches (1000000 needs only 20 bits)
codec.put_bits(match_count, 20);
if (match_count > 0) {
static_bit_model model;
// Create a bitmap and code all the bitmap entries
// NB: This is lazy and inefficient, but simple
match_symbols_t symbols = make_symbols(matches, 1);
for (auto entry : symbols) {
model.set_probability_0(get_probability_0(match_count, total_count));
codec.encode(entry, model);
--total_count;
if (entry) {
--match_count;
}
if (match_count == 0) {
break;
}
}
}
uint32_t compressed_size = codec.stop_encoder();
return buffer_t(codec.buffer(), codec.buffer() + compressed_size);
}
match_list_t decompress(buffer_t& compressed)
{
arithmetic_codec codec(static_cast<uint32_t>(compressed.size()), &compressed[0]);
codec.start_decoder();
// Read number of matches (20 bits)
uint32_t match_count(codec.get_bits(20));
uint32_t total_count(NUM_VALUES);
match_list_t result;
if (match_count > 0) {
static_bit_model model;
result.reserve(match_count);
for (uint32_t i(0); i < NUM_VALUES; ++i) {
model.set_probability_0(get_probability_0(match_count, NUM_VALUES - i));
if (codec.decode(model) == 1) {
result.push_back(i);
--match_count;
}
if (match_count == 0) {
break;
}
}
}
codec.stop_decoder();
return result;
}
private:
double get_probability_0(uint32_t match_count, uint32_t num_values = NUM_VALUES)
{
double probability_0(double(num_values - match_count) / num_values);
// Limit probability to match FastAC limitations...
return std::max(0.0001, std::min(0.9999, probability_0));
}
};
实用方法
实际上,设计一种新的压缩格式可能不值得。
事实上,将结果写为位可能甚至不值得,只需创建一个值为 0 或 1 的字节数组即可。
然后使用现有的压缩库 -- zlib 很常见,或者您可以尝试 lz4 或 snappy、bzip2、lzma...选择很多。
ZLib 示例
class zlib_codec
{
public:
zlib_codec(uint32_t bits_per_symbol) : bits_per_symbol(bits_per_symbol) {}
buffer_t compress(match_list_t const& matches)
{
match_symbols_t symbols(make_symbols(matches, bits_per_symbol));
z_stream defstream;
defstream.zalloc = nullptr;
defstream.zfree = nullptr;
defstream.opaque = nullptr;
deflateInit(&defstream, Z_BEST_COMPRESSION);
size_t max_compress_size = deflateBound(&defstream, static_cast<uLong>(symbols.size()));
buffer_t compressed(max_compress_size);
defstream.avail_in = static_cast<uInt>(symbols.size());
defstream.next_in = &symbols[0];
defstream.avail_out = static_cast<uInt>(max_compress_size);
defstream.next_out = &compressed[0];
deflate(&defstream, Z_FINISH);
deflateEnd(&defstream);
compressed.resize(defstream.total_out);
return compressed;
}
match_list_t decompress(buffer_t& compressed)
{
z_stream infstream;
infstream.zalloc = nullptr;
infstream.zfree = nullptr;
infstream.opaque = nullptr;
inflateInit(&infstream);
match_symbols_t symbols(symbol_count(bits_per_symbol));
infstream.avail_in = static_cast<uInt>(compressed.size());
infstream.next_in = &compressed[0];
infstream.avail_out = static_cast<uInt>(symbols.size());
infstream.next_out = &symbols[0];
inflate(&infstream, Z_FINISH);
inflateEnd(&infstream);
return make_matches(symbols, bits_per_symbol);
}
private:
uint32_t bits_per_symbol;
};
BZip2 示例
class bzip2_codec
{
public:
bzip2_codec(uint32_t bits_per_symbol) : bits_per_symbol(bits_per_symbol) {}
buffer_t compress(match_list_t const& matches)
{
match_symbols_t symbols(make_symbols(matches, bits_per_symbol));
uint32_t compressed_size = symbols.size() * 2;
buffer_t compressed(compressed_size);
int err = BZ2_bzBuffToBuffCompress((char*)&compressed[0]
, &compressed_size
, (char*)&symbols[0]
, symbols.size()
, 9
, 0
, 30);
if (err != BZ_OK) {
throw std::runtime_error("Compression error.");
}
compressed.resize(compressed_size);
return compressed;
}
match_list_t decompress(buffer_t& compressed)
{
match_symbols_t symbols(symbol_count(bits_per_symbol));
uint32_t decompressed_size = symbols.size();
int err = BZ2_bzBuffToBuffDecompress((char*)&symbols[0]
, &decompressed_size
, (char*)&compressed[0]
, compressed.size()
, 0
, 0);
if (err != BZ_OK) {
throw std::runtime_error("Compression error.");
}
if (decompressed_size != symbols.size()) {
throw std::runtime_error("Size mismatch.");
}
return make_matches(symbols, bits_per_symbol);
}
private:
uint32_t bits_per_symbol;
};
比较
代码存储库,包括 64 位的依赖项 Visual Studio 2015 年在 https://github.com/dan-masek/bounded_sorted_list_compression.git
存储排序整数的压缩列表在数据检索和数据库应用程序中极为常见,并且已经开发出多种技术。
我敢肯定,在你的列表中随机选择大约一半的项目将是你最糟糕的情况。
许多流行的 integer-list-compression 技术,例如 Roaring 位图,回退到使用(具有这样的 worst-case 输入数据)1-bit-per-index 位图。
因此,在您的情况下,如果有 100 万行,返回的最大负载大小将是(在最坏的情况下)设置了 "using a bitmap" 标志的 header,
后跟 100 万位(125,000 字节)的位图,例如,如果数据库中的第 700 行匹配,则位图的第 700 位设置为 1,如果数据库中的第 700 行不匹配,则位图的第 700 位设置为 0匹配。 (谢谢 Dan Mašek!)
我的理解是,虽然 quasi-succinct Elias-Fano 压缩和其他技术对于压缩许多 "naturally-occurring" 组排序整数非常有用,但对于这个 worst-case 数据集, none 其中的压缩比简单的位图更好,而大多数 "compression" 比简单的位图差得多。
(这类似于大多数 general-purpose 数据压缩算法的方式,例如 DEFLATE,当输入 "worst-case" 数据(例如 indistinguishable-from-random 加密数据时,创建 "compressed"带有几个字节开销的文件,设置了 "stored/raw/literal" 标志,后跟未压缩文件的简单副本。
我在一个应用程序中有 100 万行。它向服务器发出如下请求:
/search?q=hello
并且搜索 return 是一个排序的整数列表,表示在输入数据集中(用户在浏览器中有)匹配的行。我将如何估计有效负载的最大大小 return?例如,开始我们有:
# ~7 MB if we stored "all results" uncompressed
6888887
# ~ 3.5MB if we stored "all results" relative to 0 or ALL matches (cuts it down by two)
3444443
然后我们想使用某种解压缩(Elias-Fano?)来压缩这些整数。对于 1M 排序整数的大小,"worst-case" 场景是什么?以及如何进行计算?
应用程序有一百万行数据,所以假设 R1 --> R1000000,或者如果是零索引,range(int(1e6))
。服务器将响应类似:[1,2,3]
,表示(仅)第 1、2 和 3 行匹配。
有 2^(10^6)
个不同的排序(无重复)整数列表 < 10^6
。将每个这样的列表映射到相应的列表,比如 [0, 4, ...]
位数组(比如 10001....
)产生 10^6
位,即 125kB 的信息。由于每个位数组对应于一个唯一的可能排序列表,反之亦然,这是最
紧凑(在某种意义上:具有最小的最大尺寸)表示。
当然,如果某些结果比其他结果更有可能,则可能 效率更高(在某种意义上:具有更小的 平均值 尺寸)表示。 例如,如果大多数结果集都很小,则简单的 运行 长度编码通常会产生较小的编码。
不可避免地,在那种情况下,编码的最大大小(您询问的最大有效负载大小)将超过 125 kB
压缩上述125 kB位数组,例如zlib will yield an acceptably compact encoding for small result sets. Moreover, zlib has a function deflateBound() 给定未压缩的大小,将计算最大负载大小(在您的情况下,它肯定会大于 125 kB,但不会太大)
输入规范:
- 0到999999之间的行号(如果需要1-indexing,可以应用offset)
- 每个行号只出现一次
- 数字按升序排列(很有用,无论如何我们都想对它们进行排序)
您的一个好主意是当匹配数超过可能值的一半时反转结果的含义。让我们保留它,并假设我们有一个标志和一个 matches/misses.
的列表您最初尝试编码时将数字编码为逗号分隔的文本。这意味着对于 90% 的可能值,您需要 6 个字符 + 1 个分隔符——因此平均需要 7 个字节。但是,由于最大值为 999999,因此您实际上只需要 20 位来对每个条目进行编码。
因此,减小尺寸的第一个想法是使用二进制编码。
二进制编码
最简单的方法是写入发送的值的数量,然后是 32 位整数流。
一种更有效的方法是将两个 20 位值打包到写入的每 5 个字节中。如果是奇数,您只需用零填充 4 个多余的位。
这些方法可能适用于少量匹配(或未命中)。然而,需要注意的重要一点是,对于每一行,我们只需要跟踪 1 位信息——无论它是否存在。这意味着我们可以将结果编码为 1000000 位的位图。
结合这两种方法,我们可以在有很多匹配或未命中时使用位图,并在效率更高时切换到二进制编码。
范围缩小
编码排序的整数序列时使用的下一个潜在改进是使用范围缩减。
想法是按从大到小的顺序对值进行编码,并随着值变小而减少每个值的位数。
- 首先,我们对表示第一个值所需的位数
N
进行编码。 - 我们使用
N
位 对第一个值进行编码
- 对于以下每个值
- 使用
N
位对值进行编码 - 如果值需要较少的位来编码,适当减少
N
- 使用
熵编码
让我们回到位图编码。基于 Shannon entropy theory 最坏的情况是我们有 50% 的匹配。概率偏差越大,我们平均需要对每个条目进行编码的位数就越少。
Matches | Bits
--------+-----------
0 | 0
1 | 22
2 | 41
3 | 60
4 | 78
5 | 96
10 | 181
100 | 1474
1000 | 11408
10000 | 80794
100000 | 468996
250000 | 811279
500000 | 1000000
为此,我们需要使用可以对小数位进行编码的熵编码器——类似于算术或范围编码器或一些新的基于 ANS 的编码器,如 FSE。或者,我们可以将符号组合在一起并使用霍夫曼编码。
原型和测量
我使用 Amir Said 的 FastAC 的 32 位实现编写了一个测试,它将模型限制为小数点后 4 位。 (这不是真正的问题,因为我们不应该将此类数据直接提供给编解码器。这只是一个演示。)
首先是一些常用代码:
typedef std::vector<uint8_t> match_symbols_t;
typedef std::vector<uint32_t> match_list_t;
typedef std::set<uint32_t> match_set_t;
typedef std::vector<uint8_t> buffer_t;
// ----------------------------------------------------------------------------
static uint32_t const NUM_VALUES(1000000);
// ============================================================================
size_t symbol_count(uint8_t bits)
{
size_t count(NUM_VALUES / bits);
if (NUM_VALUES % bits > 0) {
return count + 1;
}
return count;
}
// ----------------------------------------------------------------------------
void set_symbol(match_symbols_t& symbols, uint8_t bits, uint32_t match, bool state)
{
size_t index(match / bits);
size_t offset(match % bits);
if (state) {
symbols[index] |= 1 << offset;
} else {
symbols[index] &= ~(1 << offset);
}
}
// ----------------------------------------------------------------------------
bool get_symbol(match_symbols_t const& symbols, uint8_t bits, uint32_t match)
{
size_t index(match / bits);
size_t offset(match % bits);
return (symbols[index] & (1 << offset)) != 0;
}
// ----------------------------------------------------------------------------
match_symbols_t make_symbols(match_list_t const& matches, uint8_t bits)
{
assert((bits > 0) && (bits <= 8));
match_symbols_t symbols(symbol_count(bits), 0);
for (auto match : matches) {
set_symbol(symbols, bits, match, true);
}
return symbols;
}
// ----------------------------------------------------------------------------
match_list_t make_matches(match_symbols_t const& symbols, uint8_t bits)
{
match_list_t result;
for (uint32_t i(0); i < 1000000; ++i) {
if (get_symbol(symbols, bits, i)) {
result.push_back(i);
}
}
return result;
}
首先,更简单的变体是写入匹配数,确定 match/miss 的概率并将其限制在支持的范围内。 然后使用这个静态概率模型简单地编码位图的每个值。
class arithmetic_codec_v1
{
public:
buffer_t compress(match_list_t const& matches)
{
uint32_t match_count(static_cast<uint32_t>(matches.size()));
arithmetic_codec codec(static_cast<uint32_t>(NUM_VALUES / 4));
codec.start_encoder();
// Store the number of matches (1000000 needs only 20 bits)
codec.put_bits(match_count, 20);
if (match_count > 0) {
// Initialize the model
static_bit_model model;
model.set_probability_0(get_probability_0(match_count));
// Create a bitmap and code all the bitmap entries
// NB: This is lazy and inefficient, but simple
match_symbols_t symbols = make_symbols(matches, 1);
for (auto entry : symbols) {
codec.encode(entry, model);
}
}
uint32_t compressed_size = codec.stop_encoder();
return buffer_t(codec.buffer(), codec.buffer() + compressed_size);
}
match_list_t decompress(buffer_t& compressed)
{
arithmetic_codec codec(static_cast<uint32_t>(compressed.size()), &compressed[0]);
codec.start_decoder();
// Read number of matches (20 bits)
uint32_t match_count(codec.get_bits(20));
match_list_t result;
if (match_count > 0) {
static_bit_model model;
model.set_probability_0(get_probability_0(match_count));
result.reserve(match_count);
for (uint32_t i(0); i < NUM_VALUES; ++i) {
uint32_t entry = codec.decode(model);
if (entry == 1) {
result.push_back(i);
}
}
}
codec.stop_decoder();
return result;
}
private:
double get_probability_0(uint32_t match_count, uint32_t num_values = NUM_VALUES)
{
double probability_0(double(num_values - match_count) / num_values);
// Limit probability to match FastAC limitations...
return std::max(0.0001, std::min(0.9999, probability_0));
}
};
第二种方法是根据我们编码的符号调整模型。 每次匹配编码后,降低下一次匹配的概率。 一旦我们编码了所有匹配项,就停止。
第二个变体压缩效果稍好,但性能成本显着。
class arithmetic_codec_v2
{
public:
buffer_t compress(match_list_t const& matches)
{
uint32_t match_count(static_cast<uint32_t>(matches.size()));
uint32_t total_count(NUM_VALUES);
arithmetic_codec codec(static_cast<uint32_t>(NUM_VALUES / 4));
codec.start_encoder();
// Store the number of matches (1000000 needs only 20 bits)
codec.put_bits(match_count, 20);
if (match_count > 0) {
static_bit_model model;
// Create a bitmap and code all the bitmap entries
// NB: This is lazy and inefficient, but simple
match_symbols_t symbols = make_symbols(matches, 1);
for (auto entry : symbols) {
model.set_probability_0(get_probability_0(match_count, total_count));
codec.encode(entry, model);
--total_count;
if (entry) {
--match_count;
}
if (match_count == 0) {
break;
}
}
}
uint32_t compressed_size = codec.stop_encoder();
return buffer_t(codec.buffer(), codec.buffer() + compressed_size);
}
match_list_t decompress(buffer_t& compressed)
{
arithmetic_codec codec(static_cast<uint32_t>(compressed.size()), &compressed[0]);
codec.start_decoder();
// Read number of matches (20 bits)
uint32_t match_count(codec.get_bits(20));
uint32_t total_count(NUM_VALUES);
match_list_t result;
if (match_count > 0) {
static_bit_model model;
result.reserve(match_count);
for (uint32_t i(0); i < NUM_VALUES; ++i) {
model.set_probability_0(get_probability_0(match_count, NUM_VALUES - i));
if (codec.decode(model) == 1) {
result.push_back(i);
--match_count;
}
if (match_count == 0) {
break;
}
}
}
codec.stop_decoder();
return result;
}
private:
double get_probability_0(uint32_t match_count, uint32_t num_values = NUM_VALUES)
{
double probability_0(double(num_values - match_count) / num_values);
// Limit probability to match FastAC limitations...
return std::max(0.0001, std::min(0.9999, probability_0));
}
};
实用方法
实际上,设计一种新的压缩格式可能不值得。 事实上,将结果写为位可能甚至不值得,只需创建一个值为 0 或 1 的字节数组即可。 然后使用现有的压缩库 -- zlib 很常见,或者您可以尝试 lz4 或 snappy、bzip2、lzma...选择很多。
ZLib 示例
class zlib_codec
{
public:
zlib_codec(uint32_t bits_per_symbol) : bits_per_symbol(bits_per_symbol) {}
buffer_t compress(match_list_t const& matches)
{
match_symbols_t symbols(make_symbols(matches, bits_per_symbol));
z_stream defstream;
defstream.zalloc = nullptr;
defstream.zfree = nullptr;
defstream.opaque = nullptr;
deflateInit(&defstream, Z_BEST_COMPRESSION);
size_t max_compress_size = deflateBound(&defstream, static_cast<uLong>(symbols.size()));
buffer_t compressed(max_compress_size);
defstream.avail_in = static_cast<uInt>(symbols.size());
defstream.next_in = &symbols[0];
defstream.avail_out = static_cast<uInt>(max_compress_size);
defstream.next_out = &compressed[0];
deflate(&defstream, Z_FINISH);
deflateEnd(&defstream);
compressed.resize(defstream.total_out);
return compressed;
}
match_list_t decompress(buffer_t& compressed)
{
z_stream infstream;
infstream.zalloc = nullptr;
infstream.zfree = nullptr;
infstream.opaque = nullptr;
inflateInit(&infstream);
match_symbols_t symbols(symbol_count(bits_per_symbol));
infstream.avail_in = static_cast<uInt>(compressed.size());
infstream.next_in = &compressed[0];
infstream.avail_out = static_cast<uInt>(symbols.size());
infstream.next_out = &symbols[0];
inflate(&infstream, Z_FINISH);
inflateEnd(&infstream);
return make_matches(symbols, bits_per_symbol);
}
private:
uint32_t bits_per_symbol;
};
BZip2 示例
class bzip2_codec
{
public:
bzip2_codec(uint32_t bits_per_symbol) : bits_per_symbol(bits_per_symbol) {}
buffer_t compress(match_list_t const& matches)
{
match_symbols_t symbols(make_symbols(matches, bits_per_symbol));
uint32_t compressed_size = symbols.size() * 2;
buffer_t compressed(compressed_size);
int err = BZ2_bzBuffToBuffCompress((char*)&compressed[0]
, &compressed_size
, (char*)&symbols[0]
, symbols.size()
, 9
, 0
, 30);
if (err != BZ_OK) {
throw std::runtime_error("Compression error.");
}
compressed.resize(compressed_size);
return compressed;
}
match_list_t decompress(buffer_t& compressed)
{
match_symbols_t symbols(symbol_count(bits_per_symbol));
uint32_t decompressed_size = symbols.size();
int err = BZ2_bzBuffToBuffDecompress((char*)&symbols[0]
, &decompressed_size
, (char*)&compressed[0]
, compressed.size()
, 0
, 0);
if (err != BZ_OK) {
throw std::runtime_error("Compression error.");
}
if (decompressed_size != symbols.size()) {
throw std::runtime_error("Size mismatch.");
}
return make_matches(symbols, bits_per_symbol);
}
private:
uint32_t bits_per_symbol;
};
比较
代码存储库,包括 64 位的依赖项 Visual Studio 2015 年在 https://github.com/dan-masek/bounded_sorted_list_compression.git
存储排序整数的压缩列表在数据检索和数据库应用程序中极为常见,并且已经开发出多种技术。
我敢肯定,在你的列表中随机选择大约一半的项目将是你最糟糕的情况。
许多流行的 integer-list-compression 技术,例如 Roaring 位图,回退到使用(具有这样的 worst-case 输入数据)1-bit-per-index 位图。
因此,在您的情况下,如果有 100 万行,返回的最大负载大小将是(在最坏的情况下)设置了 "using a bitmap" 标志的 header, 后跟 100 万位(125,000 字节)的位图,例如,如果数据库中的第 700 行匹配,则位图的第 700 位设置为 1,如果数据库中的第 700 行不匹配,则位图的第 700 位设置为 0匹配。 (谢谢 Dan Mašek!)
我的理解是,虽然 quasi-succinct Elias-Fano 压缩和其他技术对于压缩许多 "naturally-occurring" 组排序整数非常有用,但对于这个 worst-case 数据集, none 其中的压缩比简单的位图更好,而大多数 "compression" 比简单的位图差得多。
(这类似于大多数 general-purpose 数据压缩算法的方式,例如 DEFLATE,当输入 "worst-case" 数据(例如 indistinguishable-from-random 加密数据时,创建 "compressed"带有几个字节开销的文件,设置了 "stored/raw/literal" 标志,后跟未压缩文件的简单副本。