从 google 云存储流式传输 gzip 文件

streaming gzipped files from google cloud storage

我想直接从 Google 云存储中读取压缩文件并使用 Python csv 包打开它们。 本地文件的代码为:

def reader(self):
    print "reading local compressed file: ", self._filename
    self._localfile = gzip.open(self._filename, 'rb')
    csvReader = csv.reader(self._localfile, delimiter=',', quotechar='"')
    return csvReader

我玩过几个 GCS API(基于 JSON,cloud.storage),但其中 none 似乎给了我一些可以通过 gzip 流式传输的东西。更有什者,即使文件解压了,我也打不开文件给cv.reader(Iterator类型)

我压缩的 CSV 文件大约有 500MB,而未压缩的文件最多占用几 GB。我认为这不是一个好主意:1 - 在打开文件之前在本地下载文件(除非我可以重叠下载和计算)或 2 - 在计算之前完全在内存中打开它。

最后,我在我的本地机器上 运行 此代码,但最终,我将转移到 AppEngine,所以它也必须在那里工作。

谢谢!!

使用GCS, cloudstorage.open(filename, 'r') will give you a read-only file-like object (earlier created similarly but with 'w':-) which you can use, a chunk at a time, with the standard Python library's zlib module,特别是zlib.decompressobj,当然,如果GS对象最初是以互补方式创建的(使用zlib.compressobj)。

或者,为方便起见,您可以使用标准 Python 库的 gzip module,例如,对于阅读阶段:

compressed_flo = cloudstorage.open('objname', 'r')
uncompressed_flo = gzip.GzipFile(fileobj=compressed_flo,mode='rb')
csvReader = csv.reader(uncompressed_flo)

当然,对于早期的写作阶段,反之亦然。

请注意,当您在本地 运行(使用 dev_appserver)时,GCS 客户端库使用 本地磁盘文件 来模拟 GCS——在我的有利于开发目的的经验,当我需要从我的本地工作站与 "real" GCS 存储交互时,我可以使用 gsutil 或其他工具...当我需要从我的 GAE 进行此类交互时,GCS应用程序(以及首先在本地开发所述 GAE 应用程序:-)。

因此,您已将文件压缩存储在 GCS 上。您可以以类似流的方式 处理存储在 GCS 上的数据。也就是说,您可以同时下载、解压缩和处理。这避免了

  • 将解压后的文件存入磁盘
  • 必须等到下载完成才能处理数据。

gzip 文件有一个小的页眉和页脚,主体是一个压缩流,由一系列块组成,每个块都可以自行解压缩。 Python 的 zlib package 可以帮助您!

编辑: 这是关于如何解压缩和分析 zlib 或 gzip 流块的示例代码,完全基于 zlib

import zlib
from collections import Counter


def stream(filename):
    with open(filename, "rb") as f:
        while True:
            chunk = f.read(1024)
            if not chunk:
                break
            yield chunk


def decompress(stream):
    # Generate decompression object. Auto-detect and ignore
    # gzip wrapper, if present.
    z = zlib.decompressobj(32+15)
    for chunk in stream:
        r = z.decompress(chunk)
        if r:
            yield r


c = Counter()
s = stream("data.gz")
for chunk in decompress(s):
    for byte in chunk:
        c[byte] += 1


print c

我用示例文件 data.gz 测试了这段代码,该文件是用 GNU gzip 创建的。

引用自http://www.zlib.net/manual.html

windowBits can also be greater than 15 for optional gzip decoding. Add 32 to windowBits to enable zlib and gzip decoding with automatic header detection, or add 16 to decode only the gzip format (the zlib format will return a Z_DATA_ERROR). If a gzip stream is being decoded, strm->adler is a crc32 instead of an adler32.

Any information contained in the gzip header is not retained [...]