Python,使用 Elias Gamma 快速压缩大量数字
Python, fast compression of large amount of numbers with Elias Gamma
我们有一个二维列表,如果需要我们可以将它转换成任何东西。每行包含一些正整数(原始递增数的增量)。总共 20 亿个数字,一半以上等于 1。使用 Elias-Gamma 编码时,我们可以使用每个数字大约 3 位来逐行编码 2d 列表(稍后我们将使用行索引访问任意行)基于分布的计算。但是,我们的程序已经运行ning了12个小时了,还是没有完成编码。
这是我们正在做的事情:
from bitstring import BitArray
def _compress_2d_list(input: List[List[int]]) -> List[BitArray]:
res = []
for row in input:
res.append(sum(_elias_gamma_compress_number(num) for num in row))
return res
def _elias_gamma_compress_number(x: int) -> BitArray:
n = _log_floor(x)
return BitArray(bin="0" * n) + BitArray(uint=x, length=_log_floor(x) + 1)
def log_floor(num: int) -> int:
return floor(log(num, 2))
呼叫者:
input_2d_list: List[List[int]] # containing 1.5M lists, total 2B numbers
compressed_list = _compress_2d_list(input_2d_list)
如何优化我的代码以使其 运行 更快?我的意思是,更快......我可以使用任何可靠的流行库或数据结构。
此外,我们如何使用 BitStream
更快地解压缩?目前我一个一个读取前缀0,然后在while循环中读取压缩数字的二进制。也不是很快...
一些简单的优化导致了三倍的改进:
def _compress_2d_list(input):
res = []
for row in input:
res.append(BitArray('').join(BitArray(uint=x, length=2*x.bit_length()-1) for x in row))
return res
但是,我认为您需要比这更好的东西。在我的机器上,这将在大约 12 小时内完成,处理 150 万个列表,每个列表有 1400 个增量。
在 C 中编码大约需要一分钟。解码大约需要 15 秒。
如果您对 numpy
“位域”没问题,您可以在几分钟内完成压缩。解码速度慢了三倍,但仍然是几分钟的事。
样本运行:
# create example (1'000'000 numbers)
a = make_example()
a
# array([2, 1, 1, ..., 3, 4, 3])
b,n = encode(a) # takes ~100 ms on my machine
c = decode(b,n) # ~300 ms
# check round trip
(a==c).all()
# True
代码:
import numpy as np
def make_example():
a = np.random.choice(2000000,replace=False,size=1000001)
a.sort()
return np.diff(a)
def encode(a):
a = a.view(f'u{a.itemsize}')
l = np.log2(a).astype('u1')
L = ((l<<1)+1).cumsum()
out = np.zeros(L[-1],'u1')
for i in range(l.max()+1):
out[L-i-1] += (a>>i)&1
return np.packbits(out),out.size
def decode(b,n):
b = np.unpackbits(b,count=n).view(bool)
s = b.nonzero()[0]
s = (s<<1).repeat(np.diff(s,prepend=-1))
s -= np.arange(-1,len(s)-1)
s = s.tolist() # list has faster __getitem__
ns = len(s)
def gen():
idx = 0
yield idx
while idx < ns:
idx = s[idx]
yield idx
offs = np.fromiter(gen(),int)
sz = np.diff(offs)>>1
mx = sz.max()+1
out = np.zeros(offs.size-1,int)
for i in range(mx):
out[b[offs[1:]-i-1] & (sz>=i)] += 1<<i
return out
我们有一个二维列表,如果需要我们可以将它转换成任何东西。每行包含一些正整数(原始递增数的增量)。总共 20 亿个数字,一半以上等于 1。使用 Elias-Gamma 编码时,我们可以使用每个数字大约 3 位来逐行编码 2d 列表(稍后我们将使用行索引访问任意行)基于分布的计算。但是,我们的程序已经运行ning了12个小时了,还是没有完成编码。 这是我们正在做的事情:
from bitstring import BitArray
def _compress_2d_list(input: List[List[int]]) -> List[BitArray]:
res = []
for row in input:
res.append(sum(_elias_gamma_compress_number(num) for num in row))
return res
def _elias_gamma_compress_number(x: int) -> BitArray:
n = _log_floor(x)
return BitArray(bin="0" * n) + BitArray(uint=x, length=_log_floor(x) + 1)
def log_floor(num: int) -> int:
return floor(log(num, 2))
呼叫者:
input_2d_list: List[List[int]] # containing 1.5M lists, total 2B numbers
compressed_list = _compress_2d_list(input_2d_list)
如何优化我的代码以使其 运行 更快?我的意思是,更快......我可以使用任何可靠的流行库或数据结构。
此外,我们如何使用 BitStream
更快地解压缩?目前我一个一个读取前缀0,然后在while循环中读取压缩数字的二进制。也不是很快...
一些简单的优化导致了三倍的改进:
def _compress_2d_list(input):
res = []
for row in input:
res.append(BitArray('').join(BitArray(uint=x, length=2*x.bit_length()-1) for x in row))
return res
但是,我认为您需要比这更好的东西。在我的机器上,这将在大约 12 小时内完成,处理 150 万个列表,每个列表有 1400 个增量。
在 C 中编码大约需要一分钟。解码大约需要 15 秒。
如果您对 numpy
“位域”没问题,您可以在几分钟内完成压缩。解码速度慢了三倍,但仍然是几分钟的事。
样本运行:
# create example (1'000'000 numbers)
a = make_example()
a
# array([2, 1, 1, ..., 3, 4, 3])
b,n = encode(a) # takes ~100 ms on my machine
c = decode(b,n) # ~300 ms
# check round trip
(a==c).all()
# True
代码:
import numpy as np
def make_example():
a = np.random.choice(2000000,replace=False,size=1000001)
a.sort()
return np.diff(a)
def encode(a):
a = a.view(f'u{a.itemsize}')
l = np.log2(a).astype('u1')
L = ((l<<1)+1).cumsum()
out = np.zeros(L[-1],'u1')
for i in range(l.max()+1):
out[L-i-1] += (a>>i)&1
return np.packbits(out),out.size
def decode(b,n):
b = np.unpackbits(b,count=n).view(bool)
s = b.nonzero()[0]
s = (s<<1).repeat(np.diff(s,prepend=-1))
s -= np.arange(-1,len(s)-1)
s = s.tolist() # list has faster __getitem__
ns = len(s)
def gen():
idx = 0
yield idx
while idx < ns:
idx = s[idx]
yield idx
offs = np.fromiter(gen(),int)
sz = np.diff(offs)>>1
mx = sz.max()+1
out = np.zeros(offs.size-1,int)
for i in range(mx):
out[b[offs[1:]-i-1] & (sz>=i)] += 1<<i
return out