Python:读取12位二进制文件
Python: reading 12-bit binary files
我正在尝试使用 Python 3.
读取包含图像(视频)的 12 位二进制文件
要读取以 16 位编码的类似文件,以下方法非常有效:
import numpy as np
images = np.memmap(filename_video, dtype=np.uint16, mode='r', shape=(nb_frames, height, width))
其中 filename_video 是文件,nb_frames 可以从另一个文件读取视频的高度和宽度特征。 'working very well' 我的意思是速度快:在我的计算机上读取一个 640x256 的 140 帧视频大约需要 1 毫秒。
据我所知,当文件以 12 位编码时我无法使用它,因为没有 uint12 类型。所以我想做的是读取一个 12 位文件并将其存储在一个 16 位 uint 数组中。以下摘自 (),有效:
with open(filename_video, 'rb') as f:
data=f.read()
images=np.zeros(int(2*len(data)/3),dtype=np.uint16)
ii=0
for jj in range(0,int(len(data))-2,3):
a=bitstring.Bits(bytes=data[jj:jj+3],length=24)
images[ii],images[ii+1] = a.unpack('uint:12,uint:12')
ii=ii+2
images = np.reshape(images,(nb_frames,height,width))
但是,这非常慢:使用我的机器读取只有 5 帧的 640x256 视频大约需要 11.5 秒。理想情况下,我希望能够像使用 memmap 读取 8 位或 16 位文件一样高效地读取 12 位文件。或者至少不会慢 10^5 倍。我怎样才能加快速度?
这是一个文件示例:
http://s000.tinyupload.com/index.php?file_id=26973488795334213426
(nb_frames=5,高度=256,宽度=640)。
加速 numpy 向量化方法的一种方法是避免为临时数据分配昂贵的内存,更有效地使用缓存并利用并行化。使用 Numba
、Cython
或 C
可以很容易地完成此操作。请注意,并行化并不总是有益的。如果要转换的数组太小,使用单线程版本(parallel=False
)
Numba 版本的 Cyril Gaudefroy 回答临时内存分配
import numba as nb
import numpy as np
@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True)
def nb_read_uint12(data_chunk):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8
return out
Numba 版本的 Cyril Gaudefroy 答案带有内存预分配
如果对大小相似的数据块多次应用此函数,则只能预分配输出数组一次。
@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)
def nb_read_uint12_prealloc(data_chunk,out):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
assert out.shape[0]==data_chunk.shape[0]//3*2
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8
return out
带有临时内存分配的 DGrifffith 答案的 Numba 版本
@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True,cache=True)
def read_uint12_var_2(data_chunk):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)
return out
带内存预分配的 DGrifffith 答案的 Numba 版本
@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)
def read_uint12_var_2_prealloc(data_chunk,out):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
assert out.shape[0]==data_chunk.shape[0]//3*2
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)
return out
计时
num_Frames=10
data_chunk=np.random.randint(low=0,high=255,size=np.int(640*256*1.5*num_Frames),dtype=np.uint8)
%timeit read_uint12_gaud(data_chunk)
#11.3 ms ± 53.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#435 MB/s
%timeit nb_read_uint12(data_chunk)
#939 µs ± 24.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#5235 MB/s
out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
%timeit nb_read_uint12_prealloc(data_chunk,out)
#407 µs ± 5.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#11759 MB/s
%timeit read_uint12_griff(data_chunk)
#10.2 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#491 MB/s
%timeit read_uint12_var_2(data_chunk)
#928 µs ± 16.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#5297 MB/s
%timeit read_uint12_var_2_prealloc(data_chunk,out)
#403 µs ± 13.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#12227 MB/s
我的实现与@max9111 提出的实现略有不同,它不需要调用 unpackbits
。
它通过将中间字节切成两半并使用numpy的二进制操作,直接从三个连续的uint8
中创建两个uint12
值。下面假定 data_chunks
是一个二进制字符串,包含任意数量的 12 位整数的信息(因此它的长度必须是 3 的倍数)。
def read_uint12(data_chunk):
data = np.frombuffer(data_chunk, dtype=np.uint8)
fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T
fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4)
snd_uint12 = ((mid_uint8 % 16) << 8) + lst_uint8
return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])
我对其他实现进行了基准测试,事实证明这种方法在 ~5 Mb 输入上快 ~4 倍:
read_uint12_unpackbits
每个循环 65.5 毫秒 ± 1.11 毫秒(7 次运行的平均值 ± 标准偏差,每次 10 次循环)
read_uint12
每个循环 14 ms ± 513 µs(7 次运行的平均值 ± 标准差,每次 100 次循环)
发现@cyrilgaudefroy 的回答很有用。但是,最初,它不适用于我的 12 位压缩二进制图像数据。发现在这种特殊情况下包装有点不同。 "middle" 字节包含最低有效的半字节。三元组的字节 1 和 3 是 12 位中最高的 8 位。因此修改@cyrilgaudefroy 回答:
def read_uint12(data_chunk):
data = np.frombuffer(data_chunk, dtype=np.uint8)
fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T
fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4)
snd_uint12 = (lst_uint8 << 4) + (np.bitwise_and(15, mid_uint8))
return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])
这是另一种变体。我的数据格式是:
第一个 uint12:第二个 uint8 的最低 4 位的最高 4 位 + 第一个 uint8 的最低 8 位
第二个 uint12:第三个 uint8 的最高 8 位 + 第二个 uint8 的最高 4 位的最低 4 位
对应的代码为:
def read_uint12(data_chunk):
data = np.frombuffer(data_chunk, dtype=np.uint8)
fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T
fst_uint12 = ((mid_uint8 & 0x0F) << 8) | fst_uint8
snd_uint12 = (lst_uint8 << 4) | ((mid_uint8 & 0xF0) >> 4)
return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])
我正在尝试使用 Python 3.
读取包含图像(视频)的 12 位二进制文件要读取以 16 位编码的类似文件,以下方法非常有效:
import numpy as np
images = np.memmap(filename_video, dtype=np.uint16, mode='r', shape=(nb_frames, height, width))
其中 filename_video 是文件,nb_frames 可以从另一个文件读取视频的高度和宽度特征。 'working very well' 我的意思是速度快:在我的计算机上读取一个 640x256 的 140 帧视频大约需要 1 毫秒。
据我所知,当文件以 12 位编码时我无法使用它,因为没有 uint12 类型。所以我想做的是读取一个 12 位文件并将其存储在一个 16 位 uint 数组中。以下摘自 (
with open(filename_video, 'rb') as f:
data=f.read()
images=np.zeros(int(2*len(data)/3),dtype=np.uint16)
ii=0
for jj in range(0,int(len(data))-2,3):
a=bitstring.Bits(bytes=data[jj:jj+3],length=24)
images[ii],images[ii+1] = a.unpack('uint:12,uint:12')
ii=ii+2
images = np.reshape(images,(nb_frames,height,width))
但是,这非常慢:使用我的机器读取只有 5 帧的 640x256 视频大约需要 11.5 秒。理想情况下,我希望能够像使用 memmap 读取 8 位或 16 位文件一样高效地读取 12 位文件。或者至少不会慢 10^5 倍。我怎样才能加快速度?
这是一个文件示例: http://s000.tinyupload.com/index.php?file_id=26973488795334213426 (nb_frames=5,高度=256,宽度=640)。
加速 numpy 向量化方法的一种方法是避免为临时数据分配昂贵的内存,更有效地使用缓存并利用并行化。使用 Numba
、Cython
或 C
可以很容易地完成此操作。请注意,并行化并不总是有益的。如果要转换的数组太小,使用单线程版本(parallel=False
)
Numba 版本的 Cyril Gaudefroy 回答临时内存分配
import numba as nb
import numpy as np
@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True)
def nb_read_uint12(data_chunk):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8
return out
Numba 版本的 Cyril Gaudefroy 答案带有内存预分配
如果对大小相似的数据块多次应用此函数,则只能预分配输出数组一次。
@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)
def nb_read_uint12_prealloc(data_chunk,out):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
assert out.shape[0]==data_chunk.shape[0]//3*2
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8
return out
带有临时内存分配的 DGrifffith 答案的 Numba 版本
@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True,cache=True)
def read_uint12_var_2(data_chunk):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)
return out
带内存预分配的 DGrifffith 答案的 Numba 版本
@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)
def read_uint12_var_2_prealloc(data_chunk,out):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
assert out.shape[0]==data_chunk.shape[0]//3*2
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)
return out
计时
num_Frames=10
data_chunk=np.random.randint(low=0,high=255,size=np.int(640*256*1.5*num_Frames),dtype=np.uint8)
%timeit read_uint12_gaud(data_chunk)
#11.3 ms ± 53.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#435 MB/s
%timeit nb_read_uint12(data_chunk)
#939 µs ± 24.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#5235 MB/s
out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
%timeit nb_read_uint12_prealloc(data_chunk,out)
#407 µs ± 5.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#11759 MB/s
%timeit read_uint12_griff(data_chunk)
#10.2 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#491 MB/s
%timeit read_uint12_var_2(data_chunk)
#928 µs ± 16.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#5297 MB/s
%timeit read_uint12_var_2_prealloc(data_chunk,out)
#403 µs ± 13.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#12227 MB/s
我的实现与@max9111 提出的实现略有不同,它不需要调用 unpackbits
。
它通过将中间字节切成两半并使用numpy的二进制操作,直接从三个连续的uint8
中创建两个uint12
值。下面假定 data_chunks
是一个二进制字符串,包含任意数量的 12 位整数的信息(因此它的长度必须是 3 的倍数)。
def read_uint12(data_chunk):
data = np.frombuffer(data_chunk, dtype=np.uint8)
fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T
fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4)
snd_uint12 = ((mid_uint8 % 16) << 8) + lst_uint8
return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])
我对其他实现进行了基准测试,事实证明这种方法在 ~5 Mb 输入上快 ~4 倍:
read_uint12_unpackbits
每个循环 65.5 毫秒 ± 1.11 毫秒(7 次运行的平均值 ± 标准偏差,每次 10 次循环)
read_uint12
每个循环 14 ms ± 513 µs(7 次运行的平均值 ± 标准差,每次 100 次循环)
发现@cyrilgaudefroy 的回答很有用。但是,最初,它不适用于我的 12 位压缩二进制图像数据。发现在这种特殊情况下包装有点不同。 "middle" 字节包含最低有效的半字节。三元组的字节 1 和 3 是 12 位中最高的 8 位。因此修改@cyrilgaudefroy 回答:
def read_uint12(data_chunk):
data = np.frombuffer(data_chunk, dtype=np.uint8)
fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T
fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4)
snd_uint12 = (lst_uint8 << 4) + (np.bitwise_and(15, mid_uint8))
return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])
这是另一种变体。我的数据格式是:
第一个 uint12:第二个 uint8 的最低 4 位的最高 4 位 + 第一个 uint8 的最低 8 位
第二个 uint12:第三个 uint8 的最高 8 位 + 第二个 uint8 的最高 4 位的最低 4 位
对应的代码为:
def read_uint12(data_chunk):
data = np.frombuffer(data_chunk, dtype=np.uint8)
fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T
fst_uint12 = ((mid_uint8 & 0x0F) << 8) | fst_uint8
snd_uint12 = (lst_uint8 << 4) | ((mid_uint8 & 0xF0) >> 4)
return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])