如何使用自定义编解码器将 numpy 数组保存为字节?
How can I save a numpy array as bytes using a custom codec?
我有一个类型 int8
和形状 (100,100)
的 numpy 数组。我已经使用霍夫曼编码找到一种最有效的方式来编码其内容。对于我的特定用例,我试图保存的值大致正态分布在 0 左右,标准差约为 5,但这个问题适用于以任何分布优化保存 ndarray。就我而言,在极少数情况下可以观察到高达 -20 或 20 的极值。显然,使用霍夫曼编码这种类型的数组比使用标准的 8 位整数更有效 space。我的问题是如何做到这一点。
我尝试使用 np.save()
和 Pickle,但无法获得我想要的效率。具体来说,对于形状为 (100,100)
的数组,我使用 np.save()
得到一个大小为 10,128 字节的文件,这对于每个整数 1 个字节加上开销是有意义的。使用 pickle,我得到一个大小为 10,158 字节的文件,大致相同。然而,根据我的霍夫曼编码方案,我应该能够将我的特定测试用例(如下所示)中的数组内容编码为 144 字节! (不包括开销)
我尝试将数组中的每个整数映射到它的最佳字节字符串,所以我有一个字节数组(类型 S12
),然后保存它,但我得到一个 118kb 的文件,同时使用 np.save()
和泡菜,所以这显然行不通。
感谢您的帮助!
重现我的确切测试用例的代码:
import pickle
import numpy as np
# Seed random number generator
np.random.seed(1234)
# Build random normal array and round it
test_array = np.random.normal(0, 5, size=(100, 100))
test_array = np.round(test_array).astype(np.int8)
# Set encoding dictionary
encoding_dict = {6: b'0000',
-8: b'00010',
8: b'00011',
5: b'0010',
-5: b'0011',
12: b'0100000',
-13: b'01000010',
14: b'010000110',
-15: b'0100001110',
-14: b'0100001111',
10: b'010001',
7: b'01001',
-4: b'0101',
4: b'0110',
-7: b'01110',
11: b'0111100',
-11: b'0111101',
-12: b'01111100',
13: b'011111010',
-19: b'011111011000',
-18: b'011111011001',
-16: b'01111101101',
-17: b'011111011100',
16: b'011111011101',
15: b'01111101111',
-10: b'0111111',
-3: b'1000',
-6: b'10010',
-9: b'100110',
9: b'100111',
3: b'1010',
-2: b'1011',
1: b'1100',
2: b'1101',
-1: b'1110',
0: b'1111'}
# Save using different methods
np.save('test.npy', test_array)
with open('test.pkl', 'wb') as file:
pickle.dump(test_array, file)
# Try converting to bytes and then saving
bytes_array = np.array([encoding_dict[key] for key in test_array.flatten()]).reshape(test_array.shape)
np.save('test_bytes.npy', bytes_array)
with open('test_bytes.pkl', 'wb') as file:
pickle.dump(bytes_array, file)
# See how many bytes it should take
tmp_flat = test_array.flatten()
tmp_bytes = np.zeros_like(tmp_flat)
for i in range(len(tmp_bytes)):
tmp_bytes[i] = len(encoding_dict[tmp_flat[i]]) / 8
print(tmp_bytes.sum())
我没有使用过这种编码,但我怀疑你的 144 字节是否是一个准确的度量。
您的 bytes_array
是 100 个元素,每个元素 12 个字节 ('S12'),或者是 test_array
大小的 12 倍(每个元素 1 个字节)。
如果我们列一个清单:
In [440]: alist = [encoding_dict[key] for key in test_array.flatten()]
In [441]: len(alist)
Out[441]: 10000
In [442]: alist[:10]
Out[442]:
[b'1101',
b'10010',
b'01001',
b'1011',
b'0101',
b'0110',
b'0110',
b'1000',
b'1111',
b'0111101']
并查看这些字符串的长度:
In [444]: sum([len(i) for i in alist])
Out[444]: 43938
每个元素平均 4 个字节。即使我们能以某种方式将这些字节转换为位,也只是 50% 的压缩率:
In [445]: _/8
Out[445]: 5492.25
您无法对您描述的数据进行 70 倍的压缩。你为什么这么认为?
即使输入字节被限制为四个值,您也能得到最好的四压缩因子。 (8 位超过 2 位。)您有一个正态分布,其中 10 或 11 个值正好在 ±1 西格玛范围内。
也许您可以通过统计数据将随机字节压缩两倍。美好的一天。
更新:
刚刚计算了您的分布的熵,假设标准差为 5。我得到每个样本的熵为 4.37 位。所以我对二分之一的估计过于乐观了。更像是 1.8 的因数。
顺便说一下,您不需要手动执行此操作。您可以将 zlib 与 Z_HUFFMAN_ONLY
策略结合使用。它将为您生成最佳霍夫曼编码。
你的错误在这里:
tmp_bytes = np.zeros_like(tmp_flat)
tmp_flat
是一个 int8
数组,因此语句 tmp_bytes[i] = len(encoding_dict[tmp_flat[i]]) / 8
在将浮点值转换为 int 时截断了很多数字。将有问题的行替换为:
tmp_bytes = np.zeros(tmp_flat.shape, np.single)
但要说明如何实际进行压缩:我建议使用 np.packbits
,它实际上会为您创建一个 5493 字节的数组。
# Make a string of all the data
s = b''.join(encoding_dict[key] for key in test_array.ravel())
# Convert the string into an array
a = np.array(list(map(int, s.decode('ascii'))))
# pack it
result = np.packbits(a)
语句 a = ...
做了很多额外的工作,因为它解码数据,然后复制它,然后无数次地将字符串转换为整数,等等。这是一个更长但更多的更有效的方法:
s = bytearray(b'').join(encoding_dict[key] for key in test_array.ravel())
a = np.array(s)
a -= ord('0') # A now contains just 0 and 1
result = np.packbits(a)
存储此数组时,请确保包含预期的位数,而不是字节数。顺便说一下,你可以使用 np.unpackbits
, which supports a count
parameter specifically for this purpose (my addition 解压成二进制字符串。
最后一点,尽可能使用 ravel
而不是 flatten
。后者总是复制,而前者一般不会。
我有一个类型 int8
和形状 (100,100)
的 numpy 数组。我已经使用霍夫曼编码找到一种最有效的方式来编码其内容。对于我的特定用例,我试图保存的值大致正态分布在 0 左右,标准差约为 5,但这个问题适用于以任何分布优化保存 ndarray。就我而言,在极少数情况下可以观察到高达 -20 或 20 的极值。显然,使用霍夫曼编码这种类型的数组比使用标准的 8 位整数更有效 space。我的问题是如何做到这一点。
我尝试使用 np.save()
和 Pickle,但无法获得我想要的效率。具体来说,对于形状为 (100,100)
的数组,我使用 np.save()
得到一个大小为 10,128 字节的文件,这对于每个整数 1 个字节加上开销是有意义的。使用 pickle,我得到一个大小为 10,158 字节的文件,大致相同。然而,根据我的霍夫曼编码方案,我应该能够将我的特定测试用例(如下所示)中的数组内容编码为 144 字节! (不包括开销)
我尝试将数组中的每个整数映射到它的最佳字节字符串,所以我有一个字节数组(类型 S12
),然后保存它,但我得到一个 118kb 的文件,同时使用 np.save()
和泡菜,所以这显然行不通。
感谢您的帮助!
重现我的确切测试用例的代码:
import pickle
import numpy as np
# Seed random number generator
np.random.seed(1234)
# Build random normal array and round it
test_array = np.random.normal(0, 5, size=(100, 100))
test_array = np.round(test_array).astype(np.int8)
# Set encoding dictionary
encoding_dict = {6: b'0000',
-8: b'00010',
8: b'00011',
5: b'0010',
-5: b'0011',
12: b'0100000',
-13: b'01000010',
14: b'010000110',
-15: b'0100001110',
-14: b'0100001111',
10: b'010001',
7: b'01001',
-4: b'0101',
4: b'0110',
-7: b'01110',
11: b'0111100',
-11: b'0111101',
-12: b'01111100',
13: b'011111010',
-19: b'011111011000',
-18: b'011111011001',
-16: b'01111101101',
-17: b'011111011100',
16: b'011111011101',
15: b'01111101111',
-10: b'0111111',
-3: b'1000',
-6: b'10010',
-9: b'100110',
9: b'100111',
3: b'1010',
-2: b'1011',
1: b'1100',
2: b'1101',
-1: b'1110',
0: b'1111'}
# Save using different methods
np.save('test.npy', test_array)
with open('test.pkl', 'wb') as file:
pickle.dump(test_array, file)
# Try converting to bytes and then saving
bytes_array = np.array([encoding_dict[key] for key in test_array.flatten()]).reshape(test_array.shape)
np.save('test_bytes.npy', bytes_array)
with open('test_bytes.pkl', 'wb') as file:
pickle.dump(bytes_array, file)
# See how many bytes it should take
tmp_flat = test_array.flatten()
tmp_bytes = np.zeros_like(tmp_flat)
for i in range(len(tmp_bytes)):
tmp_bytes[i] = len(encoding_dict[tmp_flat[i]]) / 8
print(tmp_bytes.sum())
我没有使用过这种编码,但我怀疑你的 144 字节是否是一个准确的度量。
您的 bytes_array
是 100 个元素,每个元素 12 个字节 ('S12'),或者是 test_array
大小的 12 倍(每个元素 1 个字节)。
如果我们列一个清单:
In [440]: alist = [encoding_dict[key] for key in test_array.flatten()]
In [441]: len(alist)
Out[441]: 10000
In [442]: alist[:10]
Out[442]:
[b'1101',
b'10010',
b'01001',
b'1011',
b'0101',
b'0110',
b'0110',
b'1000',
b'1111',
b'0111101']
并查看这些字符串的长度:
In [444]: sum([len(i) for i in alist])
Out[444]: 43938
每个元素平均 4 个字节。即使我们能以某种方式将这些字节转换为位,也只是 50% 的压缩率:
In [445]: _/8
Out[445]: 5492.25
您无法对您描述的数据进行 70 倍的压缩。你为什么这么认为?
即使输入字节被限制为四个值,您也能得到最好的四压缩因子。 (8 位超过 2 位。)您有一个正态分布,其中 10 或 11 个值正好在 ±1 西格玛范围内。
也许您可以通过统计数据将随机字节压缩两倍。美好的一天。
更新:
刚刚计算了您的分布的熵,假设标准差为 5。我得到每个样本的熵为 4.37 位。所以我对二分之一的估计过于乐观了。更像是 1.8 的因数。
顺便说一下,您不需要手动执行此操作。您可以将 zlib 与 Z_HUFFMAN_ONLY
策略结合使用。它将为您生成最佳霍夫曼编码。
你的错误在这里:
tmp_bytes = np.zeros_like(tmp_flat)
tmp_flat
是一个 int8
数组,因此语句 tmp_bytes[i] = len(encoding_dict[tmp_flat[i]]) / 8
在将浮点值转换为 int 时截断了很多数字。将有问题的行替换为:
tmp_bytes = np.zeros(tmp_flat.shape, np.single)
但要说明如何实际进行压缩:我建议使用 np.packbits
,它实际上会为您创建一个 5493 字节的数组。
# Make a string of all the data
s = b''.join(encoding_dict[key] for key in test_array.ravel())
# Convert the string into an array
a = np.array(list(map(int, s.decode('ascii'))))
# pack it
result = np.packbits(a)
语句 a = ...
做了很多额外的工作,因为它解码数据,然后复制它,然后无数次地将字符串转换为整数,等等。这是一个更长但更多的更有效的方法:
s = bytearray(b'').join(encoding_dict[key] for key in test_array.ravel())
a = np.array(s)
a -= ord('0') # A now contains just 0 and 1
result = np.packbits(a)
存储此数组时,请确保包含预期的位数,而不是字节数。顺便说一下,你可以使用 np.unpackbits
, which supports a count
parameter specifically for this purpose (my addition 解压成二进制字符串。
最后一点,尽可能使用 ravel
而不是 flatten
。后者总是复制,而前者一般不会。