如何使用 python 和 boto3 动态生成流数据并将其写入 s3?
how generate and write stream data to s3 on the fly with python and boto3?
如何使用 python
和 boto3
将动态生成的数据动态写入 S3
?
我想实现这样的事情:
from io import BytesIO
from boto3 import ???
s3_opened_stream = ???
for i in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'x', 'y', 'z']:
data = (i*1000).decode('utf-8')
s3_opened_stream.append_chunk(BytesIO(data))
# OR something like
with ??? as s3_opened_stream:
for i in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'x', 'y', 'z']:
data = (i*1000).decode('utf-8')
s3_opened_stream.append_chunk(BytesIO(data))
并期望看到如下结果文件:
aaaaaa......
bbbbbb......
cccccc......
.....
每行都将附加到同一个 S3 对象。
我检查了互联网上的示例,到处都是在第一步完全生成数据并在生成后上传到 S3。
我尝试使用这些示例,例如:
from io import BytesIO
from boto3.s3.transfer import TransferConfig
from boto3 import resource
config = TransferConfig(
# set possible lower size to force multipart-upload in any case
multipart_threshold=1,
max_concurrency=1,
multipart_chunksize=5242880,
use_threads=False
)
bucket = resource(
service_name='s3',
region_name=params['region_name'],
endpoint_url=params['endpoint_url'],
aws_access_key_id=params['aws_access_key_id'],
aws_secret_access_key=params['aws_secret_access_key']
).Bucket(params['bucket_name'])
with BytesIO() as one_chunk:
for line in lines:
# write new line inside one_chunk
...
# write data to object
bucket.upload_fileobj(one_chunk, obj_path, Config=config, Callback=None)
# clear chunk data to release RAM
one_chunk.truncate(0)
但是upload_fileobj
每次都用新行重写对象而不是附加到它。
换句话说,我想以附加模式(如 with open('path', mode='a')
)打开 S3 对象并附加将在循环中生成的行。因为实际生成的文件很大,无法全部存储在RAM内存中
最后我放弃尝试理解 boto3
代码。它非常复杂,类 不能简单地扩展。
看起来 smart_open
是最简单的解决方案:
我用 ~4GB
输入文件
检查了这段代码
from boto3 import Session
from smart_open import open
c = Session(
aws_access_key_id=id,
aws_secret_access_key=key
).client('s3', endpoint_url='http://minio.local:9000') # I use minio for testing
read_path="bucket_name/in.csv"
write_path="bucket_name/out.csv"
with open(f"s3://{read_path}", mode='rb', transport_params={'client': c}) as fr:
with open(f"s3://{write_path}", mode='wb', transport_params={'client': c}) as fw:
for line in fr:
fw.write(line)
它就像一个魅力。内存使用量在峰值时约为 ~350MB
。 (由 htop
的 RES
值检查)
RES: How much physical RAM the process is using, measured in kilobytes.
RES stands for the resident size, which is an accurate representation of how much actual physical memory a process is consuming. (This also corresponds directly to the %MEM column)
如何使用 python
和 boto3
将动态生成的数据动态写入 S3
?
我想实现这样的事情:
from io import BytesIO
from boto3 import ???
s3_opened_stream = ???
for i in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'x', 'y', 'z']:
data = (i*1000).decode('utf-8')
s3_opened_stream.append_chunk(BytesIO(data))
# OR something like
with ??? as s3_opened_stream:
for i in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'x', 'y', 'z']:
data = (i*1000).decode('utf-8')
s3_opened_stream.append_chunk(BytesIO(data))
并期望看到如下结果文件:
aaaaaa......
bbbbbb......
cccccc......
.....
每行都将附加到同一个 S3 对象。
我检查了互联网上的示例,到处都是在第一步完全生成数据并在生成后上传到 S3。
我尝试使用这些示例,例如:
from io import BytesIO
from boto3.s3.transfer import TransferConfig
from boto3 import resource
config = TransferConfig(
# set possible lower size to force multipart-upload in any case
multipart_threshold=1,
max_concurrency=1,
multipart_chunksize=5242880,
use_threads=False
)
bucket = resource(
service_name='s3',
region_name=params['region_name'],
endpoint_url=params['endpoint_url'],
aws_access_key_id=params['aws_access_key_id'],
aws_secret_access_key=params['aws_secret_access_key']
).Bucket(params['bucket_name'])
with BytesIO() as one_chunk:
for line in lines:
# write new line inside one_chunk
...
# write data to object
bucket.upload_fileobj(one_chunk, obj_path, Config=config, Callback=None)
# clear chunk data to release RAM
one_chunk.truncate(0)
但是upload_fileobj
每次都用新行重写对象而不是附加到它。
换句话说,我想以附加模式(如 with open('path', mode='a')
)打开 S3 对象并附加将在循环中生成的行。因为实际生成的文件很大,无法全部存储在RAM内存中
最后我放弃尝试理解 boto3
代码。它非常复杂,类 不能简单地扩展。
看起来 smart_open
是最简单的解决方案:
我用 ~4GB
输入文件
from boto3 import Session
from smart_open import open
c = Session(
aws_access_key_id=id,
aws_secret_access_key=key
).client('s3', endpoint_url='http://minio.local:9000') # I use minio for testing
read_path="bucket_name/in.csv"
write_path="bucket_name/out.csv"
with open(f"s3://{read_path}", mode='rb', transport_params={'client': c}) as fr:
with open(f"s3://{write_path}", mode='wb', transport_params={'client': c}) as fw:
for line in fr:
fw.write(line)
它就像一个魅力。内存使用量在峰值时约为 ~350MB
。 (由 htop
的 RES
值检查)
RES: How much physical RAM the process is using, measured in kilobytes.
RES stands for the resident size, which is an accurate representation of how much actual physical memory a process is consuming. (This also corresponds directly to the %MEM column)