Python: 从 s3 流式传输 gzip 文件
Python: Stream gzip files from s3
我在 s3 中有文件作为 gzip 块,因此我必须连续读取数据并且不能读取随机数据。我总是必须从第一个文件开始。
例如,假设我在 s3 中有 3 个 gzip 文件,f1.gz
、f2.gz
、f3.gz
。如果我全部下载到本地,我可以做到cat * | gzip -d
。如果我这样做 cat f2.gz | gzip -d
,它将失败并显示 gzip: stdin: not in gzip format
。
如何使用 python 从 s3 流式传输这些数据?我看到了 smart-open 并且它有解压缩 gz 文件的能力
from smart_open import smart_open, open
with open(path, compression='.gz') as f:
for line in f:
print(line.strip())
其中路径是 f1.gz
的路径。这一直有效,直到它到达文件末尾,它将在此处中止。同样的事情也会在本地发生,如果我做 cat f1.gz | gzip -d
,它会在结束时出现 gzip: stdin: unexpected end of file
错误。
有没有办法让它使用 python 连续流式传输文件?
这个不会中止,可以遍历f1.gz
、f2.gz
和f3.gz
with open(path, 'rb', compression='disable') as f:
for line in f:
print(line.strip(), end="")
但输出只是字节。我原以为它可以通过使用上面的代码执行 python test.py | gzip -d
来工作,但我得到一个错误 gzip: stdin: not in gzip format
。有没有办法 python 使用 gzip 可以读取的智能打开进行打印?
For example lets say I have 3 gzip file in s3, f1.gz
, f2.gz
, f3.gz
. If I download all locally, I can do cat * | gzip -d
.
一个想法是创建一个文件对象来实现它。文件对象从一个文件句柄读取,耗尽它,从下一个文件句柄读取,耗尽它,等等。这类似于 cat
内部的工作方式。
这样做的方便之处在于,它与连接所有文件的作用相同,而无需同时读取所有文件而占用内存。
获得组合文件对象包装器后,您可以将其传递给 Python 的 gzip
模块以解压缩文件。
示例:
import gzip
class ConcatFileWrapper:
def __init__(self, files):
self.files = iter(files)
self.current_file = next(self.files)
def read(self, *args):
ret = self.current_file.read(*args)
if len(ret) == 0:
# EOF
# Optional: close self.current_file here
# self.current_file.close()
# Advance to next file and try again
try:
self.current_file = next(self.files)
except StopIteration:
# Out of files
# Return an empty string
return ret
# Recurse and try again
return self.read(*args)
return ret
def write(self):
raise NotImplementedError()
filenames = ["xaa", "xab", "xac", "xad"]
filehandles = [open(f, "rb") for f in filenames]
wrapper = ConcatFileWrapper(filehandles)
with gzip.open(wrapper) as gf:
for line in gf:
print(line)
# Close all files
[f.close() for f in filehandles]
我是这样测试的:
我创建了一个文件来通过以下命令对此进行测试。
创建内容为 1 到 1000 的文件。
$ seq 1 1000 > foo
压缩它。
$ gzip foo
拆分文件。这会生成四个名为 xaa-xad.
的文件
$ split -b 500 foo.gz
运行上面的Python文件就可以了,应该打印出1 - 1000.
编辑:关于 lazy-opening 文件的额外说明
如果您有大量文件,您可能希望一次只打开一个文件。这是一个例子:
def open_files(filenames):
for filename in filenames:
# Note: this will leak file handles unless you uncomment the code above that closes the file handles again.
yield open(filename, "rb")
我在 s3 中有文件作为 gzip 块,因此我必须连续读取数据并且不能读取随机数据。我总是必须从第一个文件开始。
例如,假设我在 s3 中有 3 个 gzip 文件,f1.gz
、f2.gz
、f3.gz
。如果我全部下载到本地,我可以做到cat * | gzip -d
。如果我这样做 cat f2.gz | gzip -d
,它将失败并显示 gzip: stdin: not in gzip format
。
如何使用 python 从 s3 流式传输这些数据?我看到了 smart-open 并且它有解压缩 gz 文件的能力
from smart_open import smart_open, open
with open(path, compression='.gz') as f:
for line in f:
print(line.strip())
其中路径是 f1.gz
的路径。这一直有效,直到它到达文件末尾,它将在此处中止。同样的事情也会在本地发生,如果我做 cat f1.gz | gzip -d
,它会在结束时出现 gzip: stdin: unexpected end of file
错误。
有没有办法让它使用 python 连续流式传输文件?
这个不会中止,可以遍历f1.gz
、f2.gz
和f3.gz
with open(path, 'rb', compression='disable') as f:
for line in f:
print(line.strip(), end="")
但输出只是字节。我原以为它可以通过使用上面的代码执行 python test.py | gzip -d
来工作,但我得到一个错误 gzip: stdin: not in gzip format
。有没有办法 python 使用 gzip 可以读取的智能打开进行打印?
For example lets say I have 3 gzip file in s3,
f1.gz
,f2.gz
,f3.gz
. If I download all locally, I can docat * | gzip -d
.
一个想法是创建一个文件对象来实现它。文件对象从一个文件句柄读取,耗尽它,从下一个文件句柄读取,耗尽它,等等。这类似于 cat
内部的工作方式。
这样做的方便之处在于,它与连接所有文件的作用相同,而无需同时读取所有文件而占用内存。
获得组合文件对象包装器后,您可以将其传递给 Python 的 gzip
模块以解压缩文件。
示例:
import gzip
class ConcatFileWrapper:
def __init__(self, files):
self.files = iter(files)
self.current_file = next(self.files)
def read(self, *args):
ret = self.current_file.read(*args)
if len(ret) == 0:
# EOF
# Optional: close self.current_file here
# self.current_file.close()
# Advance to next file and try again
try:
self.current_file = next(self.files)
except StopIteration:
# Out of files
# Return an empty string
return ret
# Recurse and try again
return self.read(*args)
return ret
def write(self):
raise NotImplementedError()
filenames = ["xaa", "xab", "xac", "xad"]
filehandles = [open(f, "rb") for f in filenames]
wrapper = ConcatFileWrapper(filehandles)
with gzip.open(wrapper) as gf:
for line in gf:
print(line)
# Close all files
[f.close() for f in filehandles]
我是这样测试的:
我创建了一个文件来通过以下命令对此进行测试。
创建内容为 1 到 1000 的文件。
$ seq 1 1000 > foo
压缩它。
$ gzip foo
拆分文件。这会生成四个名为 xaa-xad.
的文件$ split -b 500 foo.gz
运行上面的Python文件就可以了,应该打印出1 - 1000.
编辑:关于 lazy-opening 文件的额外说明
如果您有大量文件,您可能希望一次只打开一个文件。这是一个例子:
def open_files(filenames):
for filename in filenames:
# Note: this will leak file handles unless you uncomment the code above that closes the file handles again.
yield open(filename, "rb")