使用 boto 从 S3 逐行读取文件?
Read a file line by line from S3 using boto?
我在 S3 中有一个 csv 文件,我正在尝试读取 header 行以获得大小(这些文件是由我们的用户创建的,因此它们几乎可以是任何大小)。有没有办法使用 boto 来做到这一点?我想也许我可以给我们一个 python BufferedReader,但我不知道如何从 S3 密钥打开流。任何建议都会很棒。谢谢!
boto 似乎有一个 read()
函数可以做到这一点。这是一些对我有用的代码:
>>> import boto
>>> from boto.s3.key import Key
>>> conn = boto.connect_s3('ap-southeast-2')
>>> bucket = conn.get_bucket('bucket-name')
>>> k = Key(bucket)
>>> k.key = 'filename.txt'
>>> k.open()
>>> k.read(10)
'This text '
调用 read(n)
returns 来自 object 的下 n 个字节。
当然,这不会自动 return "the header line",但您可以使用足够大的数字调用它,至少 return header 行.
您可能会发现 https://pypi.python.org/pypi/smart_open 对您的任务有用。
来自文档:
for line in smart_open.smart_open('s3://mybucket/mykey.txt'):
print line
这是一个实际逐行传输数据的解决方案:
from io import TextIOWrapper
from gzip import GzipFile
...
# get StreamingBody from botocore.response
response = s3.get_object(Bucket=bucket, Key=key)
# if gzipped
gzipped = GzipFile(None, 'rb', fileobj=response['Body'])
data = TextIOWrapper(gzipped)
for line in data:
# process line
使用 boto3,您可以访问原始流并逐行阅读。
请注意,出于某种原因
,原始流是私有的 属性
s3 = boto3.resource('s3', aws_access_key_id='xxx', aws_secret_access_key='xxx')
obj = s3.Object('bucket name', 'file key')
obj.get()['Body']._raw_stream.readline() # line 1
obj.get()['Body']._raw_stream.readline() # line 2
obj.get()['Body']._raw_stream.readline() # line 3...
如果您想读取具有特定存储桶前缀(即在 "subfolder" 中)的多个文件(逐行),您可以这样做:
s3 = boto3.resource('s3', aws_access_key_id='<key_id>', aws_secret_access_key='<access_key>')
bucket = s3.Bucket('<bucket_name>')
for obj in bucket.objects.filter(Prefix='<your prefix>'):
for line in obj.get()['Body'].read().splitlines():
print(line.decode('utf-8'))
这里的行是字节,所以我正在解码它们;但如果它们已经是一个字符串,你可以跳过它。
读取文件最动态且成本最低的方法是读取每个字节,直到找到所需的行数。
line_count = 0
line_data_bytes = b''
while line_count < 2 :
incoming = correlate_file_obj['Body'].read(1)
if incoming == b'\n':
line_count = line_count + 1
line_data_bytes = line_data_bytes + incoming
logger.debug("read bytes:")
logger.debug(line_data_bytes)
line_data = line_data_bytes.split(b'\n')
如果 header 大小可以改变,您就不需要猜测 header 大小,您最终不会下载整个文件,也不需要第 3 方工具.当然,您需要确保文件中的行分隔符正确,并且您正在读取正确的字节数才能找到它。
使用 boto3:
s3 = boto3.resource('s3')
obj = s3.Object(BUCKET, key)
for line in obj.get()['Body']._raw_stream:
# do something with line
我知道这是一个很老的问题。
不过就目前而言,我们可以直接使用s3_conn.get_object(Bucket=bucket, Key=key)['Body'].iter_lines()
扩展 kooshywoosh 的回答:直接从普通二进制文件在 StreamingBody 上使用 TextIOWrapper(这非常有用)是不可能的,因为您会收到以下错误:
"builtins.AttributeError: 'StreamingBody' object has no attribute 'readable'"
但是,您可以使用在 botocore 的 github 页面上 this 长期存在的问题中提到的以下技巧,并围绕 StreamingBody 定义一个非常简单的包装器 class:
from io import RawIOBase
...
class StreamingBodyIO(RawIOBase):
"""Wrap a boto StreamingBody in the IOBase API."""
def __init__(self, body):
self.body = body
def readable(self):
return True
def read(self, n=-1):
n = None if n < 0 else n
return self.body.read(n)
然后,您可以简单地使用以下代码:
from io import TextIOWrapper
...
# get StreamingBody from botocore.response
response = s3.get_object(Bucket=bucket, Key=key)
data = TextIOWrapper(StreamingBodyIO(response))
for line in data:
# process line
stdlib 中的 codecs
module 提供了一种将字节流编码为文本流的简单方法,并提供了一个生成器来逐行检索此文本。它可以毫不费力地与 S3 一起使用:
import codecs
import boto3
s3 = boto3.resource("s3")
s3_object = s3.Object('my-bucket', 'a/b/c.txt')
line_stream = codecs.getreader("utf-8")
for line in line_stream(s3_object.get()['Body']):
print(line)
我在 S3 中有一个 csv 文件,我正在尝试读取 header 行以获得大小(这些文件是由我们的用户创建的,因此它们几乎可以是任何大小)。有没有办法使用 boto 来做到这一点?我想也许我可以给我们一个 python BufferedReader,但我不知道如何从 S3 密钥打开流。任何建议都会很棒。谢谢!
boto 似乎有一个 read()
函数可以做到这一点。这是一些对我有用的代码:
>>> import boto
>>> from boto.s3.key import Key
>>> conn = boto.connect_s3('ap-southeast-2')
>>> bucket = conn.get_bucket('bucket-name')
>>> k = Key(bucket)
>>> k.key = 'filename.txt'
>>> k.open()
>>> k.read(10)
'This text '
调用 read(n)
returns 来自 object 的下 n 个字节。
当然,这不会自动 return "the header line",但您可以使用足够大的数字调用它,至少 return header 行.
您可能会发现 https://pypi.python.org/pypi/smart_open 对您的任务有用。
来自文档:
for line in smart_open.smart_open('s3://mybucket/mykey.txt'):
print line
这是一个实际逐行传输数据的解决方案:
from io import TextIOWrapper
from gzip import GzipFile
...
# get StreamingBody from botocore.response
response = s3.get_object(Bucket=bucket, Key=key)
# if gzipped
gzipped = GzipFile(None, 'rb', fileobj=response['Body'])
data = TextIOWrapper(gzipped)
for line in data:
# process line
使用 boto3,您可以访问原始流并逐行阅读。 请注意,出于某种原因
,原始流是私有的 属性s3 = boto3.resource('s3', aws_access_key_id='xxx', aws_secret_access_key='xxx')
obj = s3.Object('bucket name', 'file key')
obj.get()['Body']._raw_stream.readline() # line 1
obj.get()['Body']._raw_stream.readline() # line 2
obj.get()['Body']._raw_stream.readline() # line 3...
如果您想读取具有特定存储桶前缀(即在 "subfolder" 中)的多个文件(逐行),您可以这样做:
s3 = boto3.resource('s3', aws_access_key_id='<key_id>', aws_secret_access_key='<access_key>')
bucket = s3.Bucket('<bucket_name>')
for obj in bucket.objects.filter(Prefix='<your prefix>'):
for line in obj.get()['Body'].read().splitlines():
print(line.decode('utf-8'))
这里的行是字节,所以我正在解码它们;但如果它们已经是一个字符串,你可以跳过它。
读取文件最动态且成本最低的方法是读取每个字节,直到找到所需的行数。
line_count = 0
line_data_bytes = b''
while line_count < 2 :
incoming = correlate_file_obj['Body'].read(1)
if incoming == b'\n':
line_count = line_count + 1
line_data_bytes = line_data_bytes + incoming
logger.debug("read bytes:")
logger.debug(line_data_bytes)
line_data = line_data_bytes.split(b'\n')
如果 header 大小可以改变,您就不需要猜测 header 大小,您最终不会下载整个文件,也不需要第 3 方工具.当然,您需要确保文件中的行分隔符正确,并且您正在读取正确的字节数才能找到它。
使用 boto3:
s3 = boto3.resource('s3')
obj = s3.Object(BUCKET, key)
for line in obj.get()['Body']._raw_stream:
# do something with line
我知道这是一个很老的问题。
不过就目前而言,我们可以直接使用s3_conn.get_object(Bucket=bucket, Key=key)['Body'].iter_lines()
扩展 kooshywoosh 的回答:直接从普通二进制文件在 StreamingBody 上使用 TextIOWrapper(这非常有用)是不可能的,因为您会收到以下错误:
"builtins.AttributeError: 'StreamingBody' object has no attribute 'readable'"
但是,您可以使用在 botocore 的 github 页面上 this 长期存在的问题中提到的以下技巧,并围绕 StreamingBody 定义一个非常简单的包装器 class:
from io import RawIOBase
...
class StreamingBodyIO(RawIOBase):
"""Wrap a boto StreamingBody in the IOBase API."""
def __init__(self, body):
self.body = body
def readable(self):
return True
def read(self, n=-1):
n = None if n < 0 else n
return self.body.read(n)
然后,您可以简单地使用以下代码:
from io import TextIOWrapper
...
# get StreamingBody from botocore.response
response = s3.get_object(Bucket=bucket, Key=key)
data = TextIOWrapper(StreamingBodyIO(response))
for line in data:
# process line
stdlib 中的 codecs
module 提供了一种将字节流编码为文本流的简单方法,并提供了一个生成器来逐行检索此文本。它可以毫不费力地与 S3 一起使用:
import codecs
import boto3
s3 = boto3.resource("s3")
s3_object = s3.Object('my-bucket', 'a/b/c.txt')
line_stream = codecs.getreader("utf-8")
for line in line_stream(s3_object.get()['Body']):
print(line)