如何使用自定义编解码器将 numpy 数组保存为字节?

How can I save a numpy array as bytes using a custom codec?

我有一个类型 int8 和形状 (100,100) 的 numpy 数组。我已经使用霍夫曼编码找到一种最有效的方式来编码其内容。对于我的特定用例,我试图保存的值大致正态分布在 0 左右,标准差约为 5,但这个问题适用于以任何分布优化保存 ndarray。就我而言,在极少数情况下可以观察到高达 -20 或 20 的极值。显然,使用霍夫曼编码这种类型的数组比使用标准的 8 位整数更有效 space。我的问题是如何做到这一点。

我尝试使用 np.save() 和 Pickle,但无法获得我想要的效率。具体来说,对于形状为 (100,100) 的数组,我使用 np.save() 得到一个大小为 10,128 字节的文件,这对于每个整数 1 个字节加上开销是有意义的。使用 pickle,我得到一个大小为 10,158 字节的文件,大致相同。然而,根据我的霍夫曼编码方案,我应该能够将我的特定测试用例(如下所示)中的数组内容编码为 144 字节! (不包括开销)

我尝试将数组中的每个整数映射到它的最佳字节字符串,所以我有一个字节数组(类型 S12),然后保存它,但我得到一个 118kb 的文件,同时使用 np.save()和泡菜,所以这显然行不通。

感谢您的帮助!

重现我的确切测试用例的代码:

import pickle
import numpy as np

# Seed random number generator
np.random.seed(1234)

# Build random normal array and round it 
test_array = np.random.normal(0, 5, size=(100, 100))
test_array = np.round(test_array).astype(np.int8)

# Set encoding dictionary
encoding_dict = {6: b'0000',
 -8: b'00010',
 8: b'00011',
 5: b'0010',
 -5: b'0011',
 12: b'0100000',
 -13: b'01000010',
 14: b'010000110',
 -15: b'0100001110',
 -14: b'0100001111',
 10: b'010001',
 7: b'01001',
 -4: b'0101',
 4: b'0110',
 -7: b'01110',
 11: b'0111100',
 -11: b'0111101',
 -12: b'01111100',
 13: b'011111010',
 -19: b'011111011000',
 -18: b'011111011001',
 -16: b'01111101101',
 -17: b'011111011100',
 16: b'011111011101',
 15: b'01111101111',
 -10: b'0111111',
 -3: b'1000',
 -6: b'10010',
 -9: b'100110',
 9: b'100111',
 3: b'1010',
 -2: b'1011',
 1: b'1100',
 2: b'1101',
 -1: b'1110',
 0: b'1111'}

# Save using different methods
np.save('test.npy', test_array)
with open('test.pkl', 'wb') as file:
    pickle.dump(test_array, file)

# Try converting to bytes and then saving
bytes_array = np.array([encoding_dict[key] for key in test_array.flatten()]).reshape(test_array.shape)
np.save('test_bytes.npy', bytes_array)
with open('test_bytes.pkl', 'wb') as file:
    pickle.dump(bytes_array, file)

# See how many bytes it should take
tmp_flat = test_array.flatten()
tmp_bytes = np.zeros_like(tmp_flat)
for i in range(len(tmp_bytes)):
    tmp_bytes[i] = len(encoding_dict[tmp_flat[i]]) / 8
print(tmp_bytes.sum())

我没有使用过这种编码,但我怀疑你的 144 字节是否是一个准确的度量。

您的 bytes_array 是 100 个元素,每个元素 12 个字节 ('S12'),或者是 test_array 大小的 12 倍(每个元素 1 个字节)。

如果我们列一个清单:

In [440]: alist = [encoding_dict[key] for key in test_array.flatten()]
In [441]: len(alist)
Out[441]: 10000
In [442]: alist[:10]
Out[442]: 
[b'1101',
 b'10010',
 b'01001',
 b'1011',
 b'0101',
 b'0110',
 b'0110',
 b'1000',
 b'1111',
 b'0111101']

并查看这些字符串的长度:

In [444]: sum([len(i) for i in alist])
Out[444]: 43938

每个元素平均 4 个字节。即使我们能以某种方式将这些字节转换为位,也只是 50% 的压缩率:

In [445]: _/8
Out[445]: 5492.25

您无法对您描述的数据进行 70 倍的压缩。你为什么这么认为?

即使输入字节被限制为四个值,您也能得到最好的四压缩因子。 (8 位超过 2 位。)您有一个正态分布,其中 10 或 11 个值正好在 ±1 西格玛范围内。

也许您可以通过统计数据将随机字节压缩两倍。美好的一天。

更新:

刚刚计算了您的分布的熵,假设标准差为 5。我得到每个样本的熵为 4.37 位。所以我对二分之一的估计过于乐观了。更像是 1.8 的因数。

顺便说一下,您不需要手动执行此操作。您可以将 zlibZ_HUFFMAN_ONLY 策略结合使用。它将为您生成最佳霍夫曼编码。

你的错误在这里:

tmp_bytes = np.zeros_like(tmp_flat)

tmp_flat 是一个 int8 数组,因此语句 tmp_bytes[i] = len(encoding_dict[tmp_flat[i]]) / 8 在将浮点值转换为 int 时截断了很多数字。将有问题的行替换为:

tmp_bytes = np.zeros(tmp_flat.shape, np.single)

但要说明如何实际进行压缩:我建议使用 np.packbits,它实际上会为您创建一个 5493 字节的数组。

# Make a string of all the data
s = b''.join(encoding_dict[key] for key in test_array.ravel())
# Convert the string into an array
a = np.array(list(map(int, s.decode('ascii'))))
# pack it
result = np.packbits(a)

语句 a = ... 做了很多额外的工作,因为它解码数据,然后复制它,然后无数次地将字符串转换为整数,等等。这是一个更长但更多的更有效的方法:

s = bytearray(b'').join(encoding_dict[key] for key in test_array.ravel())
a = np.array(s)
a -= ord('0')    # A now contains just 0 and 1
result = np.packbits(a)

存储此数组时,请确保包含预期的位数,而不是字节数。顺便说一下,你可以使用 np.unpackbits, which supports a count parameter specifically for this purpose (my addition 解压成二进制字符串。

最后一点,尽可能使用 ravel 而不是 flatten。后者总是复制,而前者一般不会。