从 S3 读取 ZIP 文件而不下载整个文件
Read ZIP files from S3 without downloading the entire file
我们有大小为 5-10GB 的 ZIP 文件。典型的 ZIP 文件有 5-10 个内部文件,每个未压缩的大小为 1-5 GB。
我有一套很好的 Python 工具来读取这些文件。基本上,我可以打开一个文件名,如果有 ZIP 文件,工具会在 ZIP 文件中搜索,然后打开压缩文件。这一切都相当透明。
我想将这些文件作为压缩文件存储在 Amazon S3 中。我可以获取 S3 文件的范围,所以应该可以获取 ZIP 中央目录(它是文件的末尾,所以我可以只读取最后的 64KiB),找到我想要的组件,下载它,然后直接流式传输到调用过程。
所以我的问题是,如何通过标准 Python ZipFile API 做到这一点?没有记录如何用支持 POSIX 语义的任意对象替换文件系统传输。这是否可以不重写模块?
下面是允许您像打开普通文件一样打开 Amazon S3 上的文件的代码。请注意,我使用 aws
命令,而不是 boto3
Python 模块。 (我无权访问 boto3。)您可以打开文件并在其上搜索。该文件缓存在本地。如果您使用 Python ZipFile API 打开文件并且它是一个 ZipFile,则您可以阅读各个部分。但是,您不能写入,因为 S3 不支持部分写入。
另外,我实现了s3open()
,可以打开文件进行读写,但是没有实现ZipFile.
需要的seek接口
from urllib.parse import urlparse
from subprocess import run,Popen,PIPE
import copy
import json
import os
import tempfile
# Tools for reading and write files from Amazon S3 without boto or boto3
# http://boto.cloudhackers.com/en/latest/s3_tut.html
# but it is easier to use the aws cli, since it's configured to work.
def s3open(path, mode="r", encoding=None):
"""
Open an s3 file for reading or writing. Can handle any size, but cannot seek.
We could use boto.
http://boto.cloudhackers.com/en/latest/s3_tut.html
but it is easier to use the aws cli, since it is present and more likely to work.
"""
from subprocess import run,PIPE,Popen
if "b" in mode:
assert encoding == None
else:
if encoding==None:
encoding="utf-8"
assert 'a' not in mode
assert '+' not in mode
if "r" in mode:
p = Popen(['aws','s3','cp',path,'-'],stdout=PIPE,encoding=encoding)
return p.stdout
elif "w" in mode:
p = Popen(['aws','s3','cp','-',path],stdin=PIPE,encoding=encoding)
return p.stdin
else:
raise RuntimeError("invalid mode:{}".format(mode))
CACHE_SIZE=4096 # big enough for front and back caches
MAX_READ=65536*16
debug=False
class S3File:
"""Open an S3 file that can be seeked. This is done by caching to the local file system."""
def __init__(self,name,mode='rb'):
self.name = name
self.url = urlparse(name)
if self.url.scheme != 's3':
raise RuntimeError("url scheme is {}; expecting s3".format(self.url.scheme))
self.bucket = self.url.netloc
self.key = self.url.path[1:]
self.fpos = 0
self.tf = tempfile.NamedTemporaryFile()
cmd = ['aws','s3api','list-objects','--bucket',self.bucket,'--prefix',self.key,'--output','json']
data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
file_info = data['Contents'][0]
self.length = file_info['Size']
self.ETag = file_info['ETag']
# Load the caches
self.frontcache = self._readrange(0,CACHE_SIZE) # read the first 1024 bytes and get length of the file
if self.length > CACHE_SIZE:
self.backcache_start = self.length-CACHE_SIZE
if debug: print("backcache starts at {}".format(self.backcache_start))
self.backcache = self._readrange(self.backcache_start,CACHE_SIZE)
else:
self.backcache = None
def _readrange(self,start,length):
# This is gross; we copy everything to the named temporary file, rather than a pipe
# because the pipes weren't showing up in /dev/fd/?
# We probably want to cache also... That's coming
cmd = ['aws','s3api','get-object','--bucket',self.bucket,'--key',self.key,'--output','json',
'--range','bytes={}-{}'.format(start,start+length-1),self.tf.name]
if debug:print(cmd)
data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
if debug:print(data)
self.tf.seek(0) # go to the beginning of the data just read
return self.tf.read(length) # and read that much
def __repr__(self):
return "FakeFile<name:{} url:{}>".format(self.name,self.url)
def read(self,length=-1):
# If length==-1, figure out the max we can read to the end of the file
if length==-1:
length = min(MAX_READ, self.length - self.fpos + 1)
if debug:
print("read: fpos={} length={}".format(self.fpos,length))
# Can we satisfy from the front cache?
if self.fpos < CACHE_SIZE and self.fpos+length < CACHE_SIZE:
if debug:print("front cache")
buf = self.frontcache[self.fpos:self.fpos+length]
self.fpos += len(buf)
if debug:print("return 1: buf=",buf)
return buf
# Can we satisfy from the back cache?
if self.backcache and (self.length - CACHE_SIZE < self.fpos):
if debug:print("back cache")
buf = self.backcache[self.fpos - self.backcache_start:self.fpos - self.backcache_start + length]
self.fpos += len(buf)
if debug:print("return 2: buf=",buf)
return buf
buf = self._readrange(self.fpos, length)
self.fpos += len(buf)
if debug:print("return 3: buf=",buf)
return buf
def seek(self,offset,whence=0):
if debug:print("seek({},{})".format(offset,whence))
if whence==0:
self.fpos = offset
elif whence==1:
self.fpos += offset
elif whence==2:
self.fpos = self.length + offset
else:
raise RuntimeError("whence={}".format(whence))
if debug:print(" ={} (self.length={})".format(self.fpos,self.length))
def tell(self):
return self.fpos
def write(self):
raise RuntimeError("Write not supported")
def flush(self):
raise RuntimeError("Flush not supported")
def close(self):
return
这是一种不需要获取整个文件的方法(完整版本可用 here)。
它确实需要 boto
(或 boto3
),但(除非您可以通过 AWS CLI 模拟远程 GET
;我想这也是很有可能的)。
import sys
import zlib
import zipfile
import io
import boto
from boto.s3.connection import OrdinaryCallingFormat
# range-fetches a S3 key
def fetch(key, start, len):
end = start + len - 1
return key.get_contents_as_string(headers={"Range": "bytes=%d-%d" % (start, end)})
# parses 2 or 4 little-endian bits into their corresponding integer value
def parse_int(bytes):
val = ord(bytes[0]) + (ord(bytes[1]) << 8)
if len(bytes) > 3:
val += (ord(bytes[2]) << 16) + (ord(bytes[3]) << 24)
return val
"""
bucket: name of the bucket
key: path to zipfile inside bucket
entry: pathname of zip entry to be retrieved (path/to/subdir/file.name)
"""
# OrdinaryCallingFormat prevents certificate errors on bucket names with dots
#
_bucket = boto.connect_s3(calling_format=OrdinaryCallingFormat()).get_bucket(bucket)
_key = _bucket.get_key(key)
# fetch the last 22 bytes (end-of-central-directory record; assuming the comment field is empty)
size = _key.size
eocd = fetch(_key, size - 22, 22)
# start offset and size of the central directory
cd_start = parse_int(eocd[16:20])
cd_size = parse_int(eocd[12:16])
# fetch central directory, append EOCD, and open as zipfile!
cd = fetch(_key, cd_start, cd_size)
zip = zipfile.ZipFile(io.BytesIO(cd + eocd))
for zi in zip.filelist:
if zi.filename == entry:
# local file header starting at file name length + file content
# (so we can reliably skip file name and extra fields)
# in our "mock" zipfile, `header_offset`s are negative (probably because the leading content is missing)
# so we have to add to it the CD start offset (`cd_start`) to get the actual offset
file_head = fetch(_key, cd_start + zi.header_offset + 26, 4)
name_len = parse_int(file_head[0:2])
extra_len = parse_int(file_head[2:4])
content = fetch(_key, cd_start + zi.header_offset + 30 + name_len + extra_len, zi.compress_size)
# now `content` has the file entry you were looking for!
# you should probably decompress it in context before passing it to some other program
if zi.compress_type == zipfile.ZIP_DEFLATED:
print zlib.decompressobj(-15).decompress(content)
else:
print content
break
在您的情况下,您可能需要将获取的内容写入本地文件(由于文件较大),除非内存使用不是问题。
这是已给出解决方案的改进版本 - 现在它使用 boto3 并处理大于 4GiB 的文件:
import boto3
import io
import struct
import zipfile
s3 = boto3.client('s3')
EOCD_RECORD_SIZE = 22
ZIP64_EOCD_RECORD_SIZE = 56
ZIP64_EOCD_LOCATOR_SIZE = 20
MAX_STANDARD_ZIP_SIZE = 4_294_967_295
def lambda_handler(event):
bucket = event['bucket']
key = event['key']
zip_file = get_zip_file(bucket, key)
print_zip_content(zip_file)
def get_zip_file(bucket, key):
file_size = get_file_size(bucket, key)
eocd_record = fetch(bucket, key, file_size - EOCD_RECORD_SIZE, EOCD_RECORD_SIZE)
if file_size <= MAX_STANDARD_ZIP_SIZE:
cd_start, cd_size = get_central_directory_metadata_from_eocd(eocd_record)
central_directory = fetch(bucket, key, cd_start, cd_size)
return zipfile.ZipFile(io.BytesIO(central_directory + eocd_record))
else:
zip64_eocd_record = fetch(bucket, key,
file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE + ZIP64_EOCD_RECORD_SIZE),
ZIP64_EOCD_RECORD_SIZE)
zip64_eocd_locator = fetch(bucket, key,
file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE),
ZIP64_EOCD_LOCATOR_SIZE)
cd_start, cd_size = get_central_directory_metadata_from_eocd64(zip64_eocd_record)
central_directory = fetch(bucket, key, cd_start, cd_size)
return zipfile.ZipFile(io.BytesIO(central_directory + zip64_eocd_record + zip64_eocd_locator + eocd_record))
def get_file_size(bucket, key):
head_response = s3.head_object(Bucket=bucket, Key=key)
return head_response['ContentLength']
def fetch(bucket, key, start, length):
end = start + length - 1
response = s3.get_object(Bucket=bucket, Key=key, Range="bytes=%d-%d" % (start, end))
return response['Body'].read()
def get_central_directory_metadata_from_eocd(eocd):
cd_size = parse_little_endian_to_int(eocd[12:16])
cd_start = parse_little_endian_to_int(eocd[16:20])
return cd_start, cd_size
def get_central_directory_metadata_from_eocd64(eocd64):
cd_size = parse_little_endian_to_int(eocd64[40:48])
cd_start = parse_little_endian_to_int(eocd64[48:56])
return cd_start, cd_size
def parse_little_endian_to_int(little_endian_bytes):
format_character = "i" if len(little_endian_bytes) == 4 else "q"
return struct.unpack("<" + format_character, little_endian_bytes)[0]
def print_zip_content(zip_file):
files = [zi.filename for zi in zip_file.filelist]
print(f"Files: {files}")
我们有大小为 5-10GB 的 ZIP 文件。典型的 ZIP 文件有 5-10 个内部文件,每个未压缩的大小为 1-5 GB。
我有一套很好的 Python 工具来读取这些文件。基本上,我可以打开一个文件名,如果有 ZIP 文件,工具会在 ZIP 文件中搜索,然后打开压缩文件。这一切都相当透明。
我想将这些文件作为压缩文件存储在 Amazon S3 中。我可以获取 S3 文件的范围,所以应该可以获取 ZIP 中央目录(它是文件的末尾,所以我可以只读取最后的 64KiB),找到我想要的组件,下载它,然后直接流式传输到调用过程。
所以我的问题是,如何通过标准 Python ZipFile API 做到这一点?没有记录如何用支持 POSIX 语义的任意对象替换文件系统传输。这是否可以不重写模块?
下面是允许您像打开普通文件一样打开 Amazon S3 上的文件的代码。请注意,我使用 aws
命令,而不是 boto3
Python 模块。 (我无权访问 boto3。)您可以打开文件并在其上搜索。该文件缓存在本地。如果您使用 Python ZipFile API 打开文件并且它是一个 ZipFile,则您可以阅读各个部分。但是,您不能写入,因为 S3 不支持部分写入。
另外,我实现了s3open()
,可以打开文件进行读写,但是没有实现ZipFile.
from urllib.parse import urlparse
from subprocess import run,Popen,PIPE
import copy
import json
import os
import tempfile
# Tools for reading and write files from Amazon S3 without boto or boto3
# http://boto.cloudhackers.com/en/latest/s3_tut.html
# but it is easier to use the aws cli, since it's configured to work.
def s3open(path, mode="r", encoding=None):
"""
Open an s3 file for reading or writing. Can handle any size, but cannot seek.
We could use boto.
http://boto.cloudhackers.com/en/latest/s3_tut.html
but it is easier to use the aws cli, since it is present and more likely to work.
"""
from subprocess import run,PIPE,Popen
if "b" in mode:
assert encoding == None
else:
if encoding==None:
encoding="utf-8"
assert 'a' not in mode
assert '+' not in mode
if "r" in mode:
p = Popen(['aws','s3','cp',path,'-'],stdout=PIPE,encoding=encoding)
return p.stdout
elif "w" in mode:
p = Popen(['aws','s3','cp','-',path],stdin=PIPE,encoding=encoding)
return p.stdin
else:
raise RuntimeError("invalid mode:{}".format(mode))
CACHE_SIZE=4096 # big enough for front and back caches
MAX_READ=65536*16
debug=False
class S3File:
"""Open an S3 file that can be seeked. This is done by caching to the local file system."""
def __init__(self,name,mode='rb'):
self.name = name
self.url = urlparse(name)
if self.url.scheme != 's3':
raise RuntimeError("url scheme is {}; expecting s3".format(self.url.scheme))
self.bucket = self.url.netloc
self.key = self.url.path[1:]
self.fpos = 0
self.tf = tempfile.NamedTemporaryFile()
cmd = ['aws','s3api','list-objects','--bucket',self.bucket,'--prefix',self.key,'--output','json']
data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
file_info = data['Contents'][0]
self.length = file_info['Size']
self.ETag = file_info['ETag']
# Load the caches
self.frontcache = self._readrange(0,CACHE_SIZE) # read the first 1024 bytes and get length of the file
if self.length > CACHE_SIZE:
self.backcache_start = self.length-CACHE_SIZE
if debug: print("backcache starts at {}".format(self.backcache_start))
self.backcache = self._readrange(self.backcache_start,CACHE_SIZE)
else:
self.backcache = None
def _readrange(self,start,length):
# This is gross; we copy everything to the named temporary file, rather than a pipe
# because the pipes weren't showing up in /dev/fd/?
# We probably want to cache also... That's coming
cmd = ['aws','s3api','get-object','--bucket',self.bucket,'--key',self.key,'--output','json',
'--range','bytes={}-{}'.format(start,start+length-1),self.tf.name]
if debug:print(cmd)
data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
if debug:print(data)
self.tf.seek(0) # go to the beginning of the data just read
return self.tf.read(length) # and read that much
def __repr__(self):
return "FakeFile<name:{} url:{}>".format(self.name,self.url)
def read(self,length=-1):
# If length==-1, figure out the max we can read to the end of the file
if length==-1:
length = min(MAX_READ, self.length - self.fpos + 1)
if debug:
print("read: fpos={} length={}".format(self.fpos,length))
# Can we satisfy from the front cache?
if self.fpos < CACHE_SIZE and self.fpos+length < CACHE_SIZE:
if debug:print("front cache")
buf = self.frontcache[self.fpos:self.fpos+length]
self.fpos += len(buf)
if debug:print("return 1: buf=",buf)
return buf
# Can we satisfy from the back cache?
if self.backcache and (self.length - CACHE_SIZE < self.fpos):
if debug:print("back cache")
buf = self.backcache[self.fpos - self.backcache_start:self.fpos - self.backcache_start + length]
self.fpos += len(buf)
if debug:print("return 2: buf=",buf)
return buf
buf = self._readrange(self.fpos, length)
self.fpos += len(buf)
if debug:print("return 3: buf=",buf)
return buf
def seek(self,offset,whence=0):
if debug:print("seek({},{})".format(offset,whence))
if whence==0:
self.fpos = offset
elif whence==1:
self.fpos += offset
elif whence==2:
self.fpos = self.length + offset
else:
raise RuntimeError("whence={}".format(whence))
if debug:print(" ={} (self.length={})".format(self.fpos,self.length))
def tell(self):
return self.fpos
def write(self):
raise RuntimeError("Write not supported")
def flush(self):
raise RuntimeError("Flush not supported")
def close(self):
return
这是一种不需要获取整个文件的方法(完整版本可用 here)。
它确实需要 boto
(或 boto3
),但(除非您可以通过 AWS CLI 模拟远程 GET
;我想这也是很有可能的)。
import sys
import zlib
import zipfile
import io
import boto
from boto.s3.connection import OrdinaryCallingFormat
# range-fetches a S3 key
def fetch(key, start, len):
end = start + len - 1
return key.get_contents_as_string(headers={"Range": "bytes=%d-%d" % (start, end)})
# parses 2 or 4 little-endian bits into their corresponding integer value
def parse_int(bytes):
val = ord(bytes[0]) + (ord(bytes[1]) << 8)
if len(bytes) > 3:
val += (ord(bytes[2]) << 16) + (ord(bytes[3]) << 24)
return val
"""
bucket: name of the bucket
key: path to zipfile inside bucket
entry: pathname of zip entry to be retrieved (path/to/subdir/file.name)
"""
# OrdinaryCallingFormat prevents certificate errors on bucket names with dots
#
_bucket = boto.connect_s3(calling_format=OrdinaryCallingFormat()).get_bucket(bucket)
_key = _bucket.get_key(key)
# fetch the last 22 bytes (end-of-central-directory record; assuming the comment field is empty)
size = _key.size
eocd = fetch(_key, size - 22, 22)
# start offset and size of the central directory
cd_start = parse_int(eocd[16:20])
cd_size = parse_int(eocd[12:16])
# fetch central directory, append EOCD, and open as zipfile!
cd = fetch(_key, cd_start, cd_size)
zip = zipfile.ZipFile(io.BytesIO(cd + eocd))
for zi in zip.filelist:
if zi.filename == entry:
# local file header starting at file name length + file content
# (so we can reliably skip file name and extra fields)
# in our "mock" zipfile, `header_offset`s are negative (probably because the leading content is missing)
# so we have to add to it the CD start offset (`cd_start`) to get the actual offset
file_head = fetch(_key, cd_start + zi.header_offset + 26, 4)
name_len = parse_int(file_head[0:2])
extra_len = parse_int(file_head[2:4])
content = fetch(_key, cd_start + zi.header_offset + 30 + name_len + extra_len, zi.compress_size)
# now `content` has the file entry you were looking for!
# you should probably decompress it in context before passing it to some other program
if zi.compress_type == zipfile.ZIP_DEFLATED:
print zlib.decompressobj(-15).decompress(content)
else:
print content
break
在您的情况下,您可能需要将获取的内容写入本地文件(由于文件较大),除非内存使用不是问题。
这是已给出解决方案的改进版本 - 现在它使用 boto3 并处理大于 4GiB 的文件:
import boto3
import io
import struct
import zipfile
s3 = boto3.client('s3')
EOCD_RECORD_SIZE = 22
ZIP64_EOCD_RECORD_SIZE = 56
ZIP64_EOCD_LOCATOR_SIZE = 20
MAX_STANDARD_ZIP_SIZE = 4_294_967_295
def lambda_handler(event):
bucket = event['bucket']
key = event['key']
zip_file = get_zip_file(bucket, key)
print_zip_content(zip_file)
def get_zip_file(bucket, key):
file_size = get_file_size(bucket, key)
eocd_record = fetch(bucket, key, file_size - EOCD_RECORD_SIZE, EOCD_RECORD_SIZE)
if file_size <= MAX_STANDARD_ZIP_SIZE:
cd_start, cd_size = get_central_directory_metadata_from_eocd(eocd_record)
central_directory = fetch(bucket, key, cd_start, cd_size)
return zipfile.ZipFile(io.BytesIO(central_directory + eocd_record))
else:
zip64_eocd_record = fetch(bucket, key,
file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE + ZIP64_EOCD_RECORD_SIZE),
ZIP64_EOCD_RECORD_SIZE)
zip64_eocd_locator = fetch(bucket, key,
file_size - (EOCD_RECORD_SIZE + ZIP64_EOCD_LOCATOR_SIZE),
ZIP64_EOCD_LOCATOR_SIZE)
cd_start, cd_size = get_central_directory_metadata_from_eocd64(zip64_eocd_record)
central_directory = fetch(bucket, key, cd_start, cd_size)
return zipfile.ZipFile(io.BytesIO(central_directory + zip64_eocd_record + zip64_eocd_locator + eocd_record))
def get_file_size(bucket, key):
head_response = s3.head_object(Bucket=bucket, Key=key)
return head_response['ContentLength']
def fetch(bucket, key, start, length):
end = start + length - 1
response = s3.get_object(Bucket=bucket, Key=key, Range="bytes=%d-%d" % (start, end))
return response['Body'].read()
def get_central_directory_metadata_from_eocd(eocd):
cd_size = parse_little_endian_to_int(eocd[12:16])
cd_start = parse_little_endian_to_int(eocd[16:20])
return cd_start, cd_size
def get_central_directory_metadata_from_eocd64(eocd64):
cd_size = parse_little_endian_to_int(eocd64[40:48])
cd_start = parse_little_endian_to_int(eocd64[48:56])
return cd_start, cd_size
def parse_little_endian_to_int(little_endian_bytes):
format_character = "i" if len(little_endian_bytes) == 4 else "q"
return struct.unpack("<" + format_character, little_endian_bytes)[0]
def print_zip_content(zip_file):
files = [zi.filename for zi in zip_file.filelist]
print(f"Files: {files}")