Python: 从 s3 流式传输 gzip 文件

Python: Stream gzip files from s3

我在 s3 中有文件作为 gzip 块,因此我必须连续读取数据并且不能读取随机数据。我总是必须从第一个文件开始。

例如,假设我在 s3 中有 3 个 gzip 文件,f1.gzf2.gzf3.gz。如果我全部下载到本地,我可以做到cat * | gzip -d。如果我这样做 cat f2.gz | gzip -d,它将失败并显示 gzip: stdin: not in gzip format

如何使用 python 从 s3 流式传输这些数据?我看到了 smart-open 并且它有解压缩 gz 文件的能力

from smart_open import smart_open, open

with open(path, compression='.gz') as f:
    for line in f:
        print(line.strip())

其中路径是 f1.gz 的路径。这一直有效,直到它到达文件末尾,它将在此处中止。同样的事情也会在本地发生,如果我做 cat f1.gz | gzip -d,它会在结束时出现 gzip: stdin: unexpected end of file 错误。

有没有办法让它使用 python 连续流式传输文件?

这个不会中止,可以遍历f1.gzf2.gzf3.gz

with open(path, 'rb', compression='disable') as f:
    for line in f:
        print(line.strip(), end="")

但输出只是字节。我原以为它可以通过使用上面的代码执行 python test.py | gzip -d 来工作,但我得到一个错误 gzip: stdin: not in gzip format。有没有办法 python 使用 gzip 可以读取的智能打开进行打印?

For example lets say I have 3 gzip file in s3, f1.gz, f2.gz, f3.gz. If I download all locally, I can do cat * | gzip -d.

一个想法是创建一个文件对象来实现它。文件对象从一个文件句柄读取,耗尽它,从下一个文件句柄读取,耗尽它,等等。这类似于 cat 内部的工作方式。

这样做的方便之处在于,它与连接所有文件的作用相同,而无需同时读取所有文件而占用内存。

获得组合文件对象包装器后,您可以将其传递给 Python 的 gzip 模块以解压缩文件。

示例:

import gzip

class ConcatFileWrapper:
    def __init__(self, files):
        self.files = iter(files)
        self.current_file = next(self.files)
    def read(self, *args):
        ret = self.current_file.read(*args)
        if len(ret) == 0:
            # EOF
            # Optional: close self.current_file here
            # self.current_file.close()
            # Advance to next file and try again
            try:
                self.current_file = next(self.files)
            except StopIteration:
                # Out of files
                # Return an empty string
                return ret
            # Recurse and try again
            return self.read(*args)
        return ret
    def write(self):
        raise NotImplementedError()

filenames = ["xaa", "xab", "xac", "xad"]
filehandles = [open(f, "rb") for f in filenames]
wrapper = ConcatFileWrapper(filehandles)

with gzip.open(wrapper) as gf:
    for line in gf:
        print(line)

# Close all files
[f.close() for f in filehandles]

我是这样测试的:

我创建了一个文件来通过以下命令对此进行测试。

创建内容为 1 到 1000 的文件。

$ seq 1 1000 > foo

压缩它。

$ gzip foo

拆分文件。这会生成四个名为 xaa-xad.

的文件
$ split -b 500 foo.gz

运行上面的Python文件就可以了,应该打印出1 - 1000.

编辑:关于 lazy-opening 文件的额外说明

如果您有大量文件,您可能希望一次只打开一个文件。这是一个例子:

def open_files(filenames):
    for filename in filenames:
        # Note: this will leak file handles unless you uncomment the code above that closes the file handles again.
        yield open(filename, "rb")