Python,使用 Elias Gamma 快速压缩大量数字

Python, fast compression of large amount of numbers with Elias Gamma

我们有一个二维列表,如果需要我们可以将它转换成任何东西。每行包含一些正整数(原始递增数的增量)。总共 20 亿个数字,一半以上等于 1。使用 Elias-Gamma 编码时,我们可以使用每个数字大约 3 位来逐行编码 2d 列表(稍后我们将使用行索引访问任意行)基于分布的计算。但是,我们的程序已经运行ning了12个小时了,还是没有完成编码。 这是我们正在做的事情:

from bitstring import BitArray
def _compress_2d_list(input: List[List[int]]) -> List[BitArray]:
    res = []
    for row in input:
        res.append(sum(_elias_gamma_compress_number(num) for num in row))
    return res 

def _elias_gamma_compress_number(x: int) -> BitArray:
    n = _log_floor(x)
    return BitArray(bin="0" * n) + BitArray(uint=x, length=_log_floor(x) + 1)

def log_floor(num: int) -> int:
    return floor(log(num, 2))

呼叫者:

input_2d_list: List[List[int]]  # containing 1.5M lists, total 2B numbers
compressed_list = _compress_2d_list(input_2d_list)

如何优化我的代码以使其 运行 更快?我的意思是,更快......我可以使用任何可靠的流行库或数据结构。

此外,我们如何使用 BitStream 更快地解压缩?目前我一个一个读取前缀0,然后在while循环中读取压缩数字的二进制。也不是很快...

一些简单的优化导致了三倍的改进:

def _compress_2d_list(input):
    res = []
    for row in input:
        res.append(BitArray('').join(BitArray(uint=x, length=2*x.bit_length()-1) for x in row))
    return res

但是,我认为您需要比这更好的东西。在我的机器上,这将在大约 12 小时内完成,处理 150 万个列表,每个列表有 1400 个增量。

在 C 中编码大约需要一分钟。解码大约需要 15 秒。

如果您对 numpy“位域”没问题,您可以在几分钟内完成压缩。解码速度慢了三倍,但仍然是几分钟的事。

样本运行:

# create example (1'000'000 numbers) 
a = make_example()
a
# array([2, 1, 1, ..., 3, 4, 3])

b,n = encode(a) # takes ~100 ms on my machine
c = decode(b,n) #       ~300 ms

# check round trip
(a==c).all()
# True

代码:

import numpy as np
    
def make_example():
    a = np.random.choice(2000000,replace=False,size=1000001)
    a.sort()
    return np.diff(a)

def encode(a):
    a = a.view(f'u{a.itemsize}')
    l = np.log2(a).astype('u1')
    L = ((l<<1)+1).cumsum()
    out = np.zeros(L[-1],'u1')
    for i in range(l.max()+1):
        out[L-i-1] += (a>>i)&1
    return np.packbits(out),out.size

def decode(b,n):
    b = np.unpackbits(b,count=n).view(bool)
    s = b.nonzero()[0]
    s = (s<<1).repeat(np.diff(s,prepend=-1))
    s -= np.arange(-1,len(s)-1)
    s = s.tolist() # list has faster __getitem__
    ns = len(s)
    def gen():
        idx = 0
        yield idx
        while idx < ns:
            idx = s[idx]
            yield idx
    offs = np.fromiter(gen(),int)
    sz = np.diff(offs)>>1
    mx = sz.max()+1
    out = np.zeros(offs.size-1,int)
    for i in range(mx):
        out[b[offs[1:]-i-1] & (sz>=i)] += 1<<i
    return out