从单个文件读取和写入多个压缩帧

Reading and Writing multiple compressed frames from single file

我有非常大的字节数据存储在 txt 文件中(每个大小大约 1TB,(4 个深度相机 运行ning 24-7 同时写入)以大约每分钟 4GB 的速率写入)到硬盘开车。

每个文件都包含大量以字节编码的图像帧。每一帧都使用lz4压缩并写入相应的相机文件名,只有4个文件,但随着录制时间的延长,会增加。

当只有数据时,每一帧的大小都是未知的。

每个帧都由一个未压缩的字节数组模式分隔,该模式连接到已压缩帧的末尾。像这样:

                map_to_sort.sort()
                frame_ender_bytes = bytearray("<FRAME_END>", 'utf-8')
                for k, frame_compressed in map_to_sort.mapper.items():
                    with lz4.frm.open(frame_compressed[0], 'ab') as fp:
                        fp.write(frame_compressed[1] + frame_ender_bytes)

我正在尝试读取每个帧,对其进行解压缩,然后将未压缩的帧写入另一个 12TB 硬盘驱动器(在 18 小时内,或者最好更少,(比将数据写入和压缩到文件)。不幸的是,我无法在这个项目中使用大容量 SSD,所以我必须了解如何管理在硬盘上读取和写入压缩数据的瓶颈。

关于如何解决它,我提出了几个不同的想法,但是 none 其中似乎效果很好。 当然,我尝试过逐字节读取文件的一些简单方法(太长甚至无法读取)。 我也试过:

以大约每帧大小的块读取文件 并像这样使用拆分函数进行检查:

import lz4.frame as frm
import numpy as np
def chunky(stream):
    buffer = bytearray()
    while True: 
        chunk = stream.read(40096)
        if chunk:
            buffer += chunk
        else:
            break
        while True:
            combine = buffer.split(b'<FRAME_END>', 1)
            if len(combine) < 2:
                break
            else:
                buffer = combine[1]
                part = combine[0]
                yield part

#................................................
# Elsewhere in code
# dec_path is a string variable that stores the decompressed file path corresponding to the camera path file
with frm.open(camera_file_path, 'rb') as fp:
    for ch in chunky(fp):
        output_arr = ch
        decompressed_frame = frm.decompress(output_arr)
        with frm.open(dec_path, 'ab') as fj:
            fj.write(decompressed_frame)

这种方法当然有很多边缘情况((如果 被读取分成一个块读取......,拆分函数将不会'打不开,会出现无法识别的帧。(不过这种情况不常发生,解压和写入的时间还是大于18小时)

创建一个 JSON/txt 文件,其中包含类似地图的结构。其中每个键映射到每个帧的开始和结束位置(以字节为单位)。然后将数据存储为字典,然后利用它以适当的字节数读取每个帧。 写入压缩文件:

# In this example there are no separators between each frame as I am keeping track of positions.
# map_to_sort is a dictionary organized like so 
# int k : ["E:\Frame_Data\cam_compressed_0.txt", #compressed frame, 56000, 10000, "E:\Frame_Data\cam_compressed_0_mapper_data.txt"]
map_to_sort.sort()
for k, frame_compressed in map_to_sort.mapper.items():
    with frm.open(frame_compressed[0], 'ab') as fp:
         fp.write(frame_compressed[1])
    with open(frame_compressed[4], 'w') as fil:
         str_to_write = str(k) + "," + str(frame_compressed[2]) + "," + str(frame_compressed[3]) + "," + frame_compressed[0] + "," + "\n"
         fil.write(str_to_write)
map_to_sort.clear()

读写解压文件

with open(TEXT_COMPRESSED_MAPPER_PATH, 'r') as fr:
     for line in fr:
         str_arr = (line.strip()).split(",")
         file_comp_read_from = str_arr[3]
         start_byte_pos = str_arr[1]
         end_byte_pos = str_arr[2]
         with frm.open(str_arr[3], 'rb') as fp:
             fp.seek(start_byte_pos, 0)
             fp.read(end_byte_pos - start_byte_pos)
         # Same decompression code as before

然而,这样做的缺点是文本或 json 文件占用的内存(并非无关紧要)和 RAM(RAM 限制)。以及帧读取不一致这一事实,我会得到许多没有正确解压缩的帧。也许我不确定我是否正在收集相对于框架开始和结束位置的文件大小的真实确切位置。如果有人有任何指示也可以提供帮助。

最有可能帮助的最后一件事是将 lz4 压缩级别从 7 提高到更高的级别。较小的文件大小无疑会减少管理。 (减压对我的 CPU 来说不是那么密集)。

在压缩级别 7 时,我已经使用了大约 70% 的所有 8 个内核,在压缩级别 8 时,我开始丢失帧,因为解压缩时间太长,并且 CPU 使用率在98%.

很多压缩代码使用了多线程、多处理的组合。代码太长 post 这里也不一定允许 post 包含它的 link (目前还没有开源,我可以 post 部分可能如果你愿意的话)。

基本思想是让一个进程处理记录并将帧放入共享队列,另一个进程同时发生,检查队列是否为空,然后利用线程处理队列(帧弹出)一次压缩多个帧。然后将未排序的帧排序并按顺序写入相应的文件。如果有人对如何显着提高压缩率而不给我的 CPU 造成太大压力,那就太好了。

我对这种设计不太满意,因为在两个进程之间使用共享队列并不是很好的做法。然而,我看到的选择很少,因为我使用的英特尔深度相机在多处理模块上的表现不是特别好,所以我无法轻松地为每个相机设置一个进程。 Intel Depth Cameras python 模块的实施也不适合以简单的方式同时记录和流式传输。

如果您希望在我处理文件压缩的​​过程中看到更多代码,i/o请告诉我。

随意提及有损或其他更快或

抱歉,信息量太大。

TLDR: 在 python 中使用 lz4 压缩在短时间内解压缩大量文件,然后利用 I/0 将解压缩的文件写入另一个硬盘驱动器时出现问题。 上面的一些代码和确切信息。 目标是能够在 18 小时或更短时间内解压所有文件并将它们写入另一个硬盘。

编辑

我能够利用解决方案中建议的两种方法获得我正在寻找的压缩级别。 (更快的读写时间)。然而,这些在 6 级压缩时使用 gzip 效果最好,而不是 lz4。我发现我的大部分文件的压缩率都在 6:1 左右。 cpu 的内核越多、质量越高,您获得的结果就越好。我有 4 个同步深度流,使用 AMD Ryzen 7 5800X 处理器,任何更高的压缩级别,我开始丢帧。理论上,如果您拥有更多核心,您可能会获得更好的结果。我还没有测试过这个。您将 运行 遇到的主要瓶颈是硬盘的速度。如果您有预算,我强烈建议您使用 SSD。

这是关于使用固定长度 header 拆分文件中的 blob 的概念证明。不确定这是否解决了您所有的问题,但也许是部分问题。

import random
import string
import tempfile
import os

def bytes_generator():
    out = []
    for i in range(random.randint(10, 20)):
        out.append(random.choice(string.ascii_lowercase))
    return ''.join(out).encode('utf-8')

INT_BYTE_SIZE = 8
def int_to_bytes(x: int) -> bytes:
    return x.to_bytes(INT_BYTE_SIZE, 'big')
def int_from_bytes(xbytes: bytes) -> int:
    return int.from_bytes(xbytes, 'big')

def run():
    filehandle = tempfile.TemporaryFile()
    blobs = []
    for i in range(10):
        data = bytes_generator()
        blobs.append(data)
        int_bytes = int_to_bytes(len(data))
        filehandle.write(int_bytes)
        filehandle.write(data)

    # Reset as if just opened
    filehandle.seek(0, os.SEEK_SET)

    # Get file legnth
    filehandle.seek(0, os.SEEK_END)
    file_length = filehandle.tell()

    # Reset for reading
    filehandle.seek(0, os.SEEK_SET)

    i = 0
    while filehandle.tell() < file_length:
        data_length = int_from_bytes(filehandle.read(INT_BYTE_SIZE))
        read_data = filehandle.read(data_length)
        assert read_data == blobs[i]
        i += 1

if __name__ == "__main__":
    run()

int/byte 或多或少从 Converting int to bytes in Python 3 偷来的转换