如何从 s3 流式传输大型 gzip .tsv 文件,对其进行处理,然后写回 s3 上的新文件?
How to stream a large gzipped .tsv file from s3, process it, and write back to a new file on s3?
我有一个大文件 s3://my-bucket/in.tsv.gz
我想加载和处理,将其处理后的版本写回到 s3 输出文件 s3://my-bucket/out.tsv.gz
.
- 如何直接从 s3 精简
in.tsv.gz
而无需将所有文件加载到内存(内存放不下)
- 如何将处理后的 gzip 流直接写入 s3?
在下面的代码中,我展示了我是如何考虑从 s3 加载输入的 gzip 数据帧,以及如果 .tsv
位于本地 bucket_dir_local = ./
.[=17,我将如何编写 .tsv
=]
import pandas as pd
import s3fs
import os
import gzip
import csv
import io
bucket_dir = 's3://my-bucket/annotations/'
df = pd.read_csv(os.path.join(bucket_dir, 'in.tsv.gz'), sep='\t', compression="gzip")
bucket_dir_local='./'
# not sure how to do it with an s3 path
with gzip.open(os.path.join(bucket_dir_local, 'out.tsv.gz'), "w") as f:
with io.TextIOWrapper(f, encoding='utf-8') as wrapper:
w = csv.DictWriter(wrapper, fieldnames=['test', 'testing'], extrasaction="ignore")
w.writeheader()
for index, row in df.iterrows():
my_dict = {"test": index, "testing": row[6]}
w.writerow(my_dict)
编辑:smart_open 看起来不错。
要下载文件,您可以stream the S3 object directly in python。我建议阅读整个 post 但其中的一些关键行
import boto3
s3 = boto3.client('s3', aws_access_key_id='mykey', aws_secret_access_key='mysecret') # your authentication may vary
obj = s3.get_object(Bucket='my-bucket', Key='my/precious/object')
import gzip
body = obj['Body']
with gzip.open(body, 'rt') as gf:
for ln in gf:
process(ln)
不幸的是,S3 不支持真正的流式输入,但 this SO answer 有一个将文件分块并将每个块发送到 S3 的实现。虽然不是“真正的流”,但它可以让您上传大文件而无需将整个文件保存在内存中
这是一个从 s3 读取文件并使用 smart_open
将其写回 s3 的虚拟示例
from smart_open import open
import os
bucket_dir = "s3://my-bucket/annotations/"
with open(os.path.join(bucket_dir, "in.tsv.gz"), "rb") as fin:
with open(
os.path.join(bucket_dir, "out.tsv.gz"), "wb"
) as fout:
for line in fin:
l = [i.strip() for i in line.decode().split("\t")]
string = "\t".join(l) + "\n"
fout.write(string.encode())
我有一个大文件 s3://my-bucket/in.tsv.gz
我想加载和处理,将其处理后的版本写回到 s3 输出文件 s3://my-bucket/out.tsv.gz
.
- 如何直接从 s3 精简
in.tsv.gz
而无需将所有文件加载到内存(内存放不下) - 如何将处理后的 gzip 流直接写入 s3?
在下面的代码中,我展示了我是如何考虑从 s3 加载输入的 gzip 数据帧,以及如果 .tsv
位于本地 bucket_dir_local = ./
.[=17,我将如何编写 .tsv
=]
import pandas as pd
import s3fs
import os
import gzip
import csv
import io
bucket_dir = 's3://my-bucket/annotations/'
df = pd.read_csv(os.path.join(bucket_dir, 'in.tsv.gz'), sep='\t', compression="gzip")
bucket_dir_local='./'
# not sure how to do it with an s3 path
with gzip.open(os.path.join(bucket_dir_local, 'out.tsv.gz'), "w") as f:
with io.TextIOWrapper(f, encoding='utf-8') as wrapper:
w = csv.DictWriter(wrapper, fieldnames=['test', 'testing'], extrasaction="ignore")
w.writeheader()
for index, row in df.iterrows():
my_dict = {"test": index, "testing": row[6]}
w.writerow(my_dict)
编辑:smart_open 看起来不错。
要下载文件,您可以stream the S3 object directly in python。我建议阅读整个 post 但其中的一些关键行
import boto3
s3 = boto3.client('s3', aws_access_key_id='mykey', aws_secret_access_key='mysecret') # your authentication may vary
obj = s3.get_object(Bucket='my-bucket', Key='my/precious/object')
import gzip
body = obj['Body']
with gzip.open(body, 'rt') as gf:
for ln in gf:
process(ln)
不幸的是,S3 不支持真正的流式输入,但 this SO answer 有一个将文件分块并将每个块发送到 S3 的实现。虽然不是“真正的流”,但它可以让您上传大文件而无需将整个文件保存在内存中
这是一个从 s3 读取文件并使用 smart_open
from smart_open import open
import os
bucket_dir = "s3://my-bucket/annotations/"
with open(os.path.join(bucket_dir, "in.tsv.gz"), "rb") as fin:
with open(
os.path.join(bucket_dir, "out.tsv.gz"), "wb"
) as fout:
for line in fin:
l = [i.strip() for i in line.decode().split("\t")]
string = "\t".join(l) + "\n"
fout.write(string.encode())