Python:压缩和save/load大数据from/into内存

Python: compress and save/load large data from/into memory

我有一个巨大的字典,其中包含 numpy 数组作为值,几乎占用了所有 RAM。不可能完全腌制或压缩它。我已经使用 zlib 分块检查了 some of solutions 到 read/write,但是当我想 read/write [=31] 时,它们会处理文件 StringIO 等=]内存。

,但它只有写作部分。这样保存后如何读取对象,因为块是写在一起的,压缩后的块当然长度不同?

import zlib


class ZlibWrapper():
    # chunksize is used to save memory, otherwise huge object will be copied
    def __init__(self, filename, chunksize=268435456): # 256 MB
        self.filename = filename
        self.chunksize = chunksize


    def save(self, data): 
        """Saves a compressed object to disk
        """
        mdata = memoryview(data)
        with open(self.filename, 'wb') as f:
          for i in range(0, len(mdata), self.chunksize):
             mychunk = zlib.compress(bytes(mdata[i:i+self.chunksize]))
             f.write(mychunk)

    def load(self):

        # ???

        return data

不幸的是,未压缩的对象太大而无法通过网络发送,并且在外部压缩它们会产生额外的复杂性。

不幸的是,Pickle 开始消耗 RAM 并且系统挂起。

在与 Charles Duffy 的讨论之后,这是我的序列化尝试(目前不起作用 - 甚至不压缩字符串):

import zlib

import json

import numpy as np



mydict = {"a":np.array([1,2,3]),"b":np.array([4,5,6]),"c":np.array([0,0,0])}


#------------


# write to compressed stream ---------------------

def string_stream_serialization(dic):
    for key, val in dic.items():        
        #key_encoded = key.encode("utf-8")  # is not json serializable
        yield json.dumps([key,val.tolist()])


output = ""
compressor = zlib.compressobj()
decompressor = zlib.decompressobj()


stream = string_stream_serialization(mydict)

with open("outfile.compressed", "wb") as f:
    for s in stream:
        if not s:
            f.write(compressor.flush())
            break
        f.write(compressor.compress(s.encode('utf-8'))) # .encode('utf-8') converts to bytes




# read from compressed stream: --------------------

def read_in_chunks(file_object, chunk_size=1024): # I set another chunk size intentionally
    """Lazy function (generator) to read a file piece by piece.
    Default chunk size: 1k."""
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


reconstructed = {}

with open("outfile.compressed", "rb") as f:
    for s in read_in_chunks(f):
        data = decompressor.decompress(decompressor.unconsumed_tail + s)
        while data:
            arr = json.loads(data.decode("utf-8"))            
            reconstructed[arr[0]] = np.array(arr[1])
            data = decompressor.decompress(decompressor.unconsumed_tail)


print(reconstructed)

要将字典写入磁盘,zipfile 模块非常适合。

  • 保存时 - 将每个块保存为 zip 文件。
  • 加载时 - 迭代 zip 中的文件并重建数据。

您的首要关注点应该是拥有一种合理的方式来序列化和反序列化您的数据。我们对您在问题本身或在对相同问题的评论中提供的数据有一些限制:

  • 您的数据由一个包含大量 key/value 对的字典组成
  • 所有键都是 unicode 字符串
  • 所有值都是 numpy 数组,它们单独足够短,可以在任何给定时间轻松放入内存(甚至允许任何单个值的多个副本),尽管 总计 所需的存储空间变得非常大。

这表明一个相当简单的实现:

def serialize(f, content):
    for k,v in content.items():
        # write length of key, followed by key as string
        k_bstr = k.encode('utf-8')
        f.write(struct.pack('L', len(k_bstr)))
        f.write(k_bstr)
        # write length of value, followed by value in numpy.save format
        memfile = io.BytesIO()
        numpy.save(memfile, v)
        f.write(struct.pack('L', memfile.tell()))
        f.write(memfile.getvalue())

def deserialize(f):
    retval = {}
    while True:
        content = f.read(struct.calcsize('L'))
        if not content: break
        k_len = struct.unpack('L', content)[0]
        k_bstr = f.read(k_len)
        k = k_bstr.decode('utf-8')
        v_len = struct.unpack('L', f.read(struct.calcsize('L')))[0]
        v_bytes = io.BytesIO(f.read(v_len))
        v = numpy.load(v_bytes)
        retval[k] = v
    return retval

作为一个简单的测试:

test_file = io.BytesIO()
serialize(test_file, {
    "First Key": numpy.array([123,234,345]),
    "Second Key": numpy.array([321,432,543]),
})

test_file.seek(0)
print(deserialize(test_file))

...所以,我们知道了 - 现在,我们如何添加压缩?轻松。

with gzip.open('filename.gz', 'wb') as gzip_file:
    serialize(gzip_file, your_data)

...或者,在减压端:

with gzip.open('filename.gz', 'rb') as gzip_file:
    your_data = deserialize(gzip_file)

这是可行的,因为 gzip 库已经按要求流出数据,而不是一次性压缩或解压。 不需要开窗把自己分块——把它留给下层。