合并每小时生成的百万个 S3 文件
Merge million S3 files generated hourly
我每小时创建数百万个文件。每个文件有一行数据。这些文件需要合并成一个文件。
我尝试过以下方式:-
- 使用 aws s3 cp 下载文件一小时。
- 使用bash 命令合并文件。
或者
- 使用 python 脚本合并文件。
Airflow on Kubernetes (EKS) 中 运行 这个每小时的工作。这需要一个多小时才能完成,并且正在积压。另一个问题是它经常导致 EC2 节点由于 CPU 和内存使用率过高而停止响应。 运行完成这项工作最有效的方法是什么?
python脚本供参考:-
from os import listdir
import sys
# from tqdm import tqdm
files = listdir('./temp/')
dest = sys.argv[1]
data = []
tot_len = len(files)
percent = tot_len//100
for i, file in enumerate(files):
if(i % percent == 0):
print(f'{i/percent}% complete.')
with open('./temp/'+file, 'r') as f:
d = f.read()
data.append(d)
result = '\n'.join(data)
with open(dest, 'w') as f:
f.write(result)
可扩展且可靠的方法是:
- 将 Amazon S3 存储桶配置为 在新文件到达时触发 AWS Lambda 函数
- 在 AWS Lambda 函数中,读取文件内容并将其发送到 Amazon Kinesis Firehose 流。然后,删除输入文件。
- 配置 Amazon Kinesis Firehose 流以缓冲输入数据并根据时间段(最多 15 分钟)或数据大小(最多 128MB)输出新文件
参见:Amazon Kinesis Data Firehose Data Delivery - Amazon Kinesis Data Firehose
这将不会每小时生成文件 -- 文件数量将取决于传入数据的大小。
如果您需要创建每小时文件,您可以考虑在 Firehose 的输出文件上使用 Amazon Athena。 Athena 允许您 运行 SQL 查询存储在 Amazon S3 中的文件。因此,如果输入文件包含日期列,它可以 select 仅显示特定时间的数据。 (您可以为此编写 Lambda 函数代码,添加一个日期列。)
我希望您应该非常认真地考虑遵循您得到的特定于 AWS 的答案中的想法。我会将此回复添加为评论,但无法在评论中连贯地显示缩进代码。
关于您的 Python 脚本,您正在构建一个巨大的字符串,其字符数等于所有输入文件中的字符总数。所以当然内存使用量至少会增长那么大。
读取文件内容后立即写出文件内容会占用更少的内存(请注意,此代码未经测试 - 可能有错别字,我不知道):
with open(dest, 'w') as fout:
for i, file in enumerate(files):
if(i % percent == 0):
print(f'{i/percent}% complete.')
with open('./temp/'+file, 'r') as fin:
fout.write(fin.read())
如果您继续这样做,还可以尝试一件事:改为以二进制模式打开文件('wb'
和 'rb'
)。 可能 保存无用的文本模式字符层 decoding/encoding。我假设您只想将原始字节粘贴在一起。
把它放在那里以防其他人需要它。
我尽我所能优化了合并代码,但瓶颈仍然是读取或下载 s3 文件,即使使用官方 aws cli 也非常慢。
我找到了一个库 s5cmd,它非常快,因为它充分利用了多处理和多线程,它解决了我的问题。
Link :- https://github.com/peak/s5cmd
我每小时创建数百万个文件。每个文件有一行数据。这些文件需要合并成一个文件。
我尝试过以下方式:-
- 使用 aws s3 cp 下载文件一小时。
- 使用bash 命令合并文件。 或者
- 使用 python 脚本合并文件。
Airflow on Kubernetes (EKS) 中 运行 这个每小时的工作。这需要一个多小时才能完成,并且正在积压。另一个问题是它经常导致 EC2 节点由于 CPU 和内存使用率过高而停止响应。 运行完成这项工作最有效的方法是什么?
python脚本供参考:-
from os import listdir
import sys
# from tqdm import tqdm
files = listdir('./temp/')
dest = sys.argv[1]
data = []
tot_len = len(files)
percent = tot_len//100
for i, file in enumerate(files):
if(i % percent == 0):
print(f'{i/percent}% complete.')
with open('./temp/'+file, 'r') as f:
d = f.read()
data.append(d)
result = '\n'.join(data)
with open(dest, 'w') as f:
f.write(result)
可扩展且可靠的方法是:
- 将 Amazon S3 存储桶配置为 在新文件到达时触发 AWS Lambda 函数
- 在 AWS Lambda 函数中,读取文件内容并将其发送到 Amazon Kinesis Firehose 流。然后,删除输入文件。
- 配置 Amazon Kinesis Firehose 流以缓冲输入数据并根据时间段(最多 15 分钟)或数据大小(最多 128MB)输出新文件
参见:Amazon Kinesis Data Firehose Data Delivery - Amazon Kinesis Data Firehose
这将不会每小时生成文件 -- 文件数量将取决于传入数据的大小。
如果您需要创建每小时文件,您可以考虑在 Firehose 的输出文件上使用 Amazon Athena。 Athena 允许您 运行 SQL 查询存储在 Amazon S3 中的文件。因此,如果输入文件包含日期列,它可以 select 仅显示特定时间的数据。 (您可以为此编写 Lambda 函数代码,添加一个日期列。)
我希望您应该非常认真地考虑遵循您得到的特定于 AWS 的答案中的想法。我会将此回复添加为评论,但无法在评论中连贯地显示缩进代码。
关于您的 Python 脚本,您正在构建一个巨大的字符串,其字符数等于所有输入文件中的字符总数。所以当然内存使用量至少会增长那么大。
读取文件内容后立即写出文件内容会占用更少的内存(请注意,此代码未经测试 - 可能有错别字,我不知道):
with open(dest, 'w') as fout:
for i, file in enumerate(files):
if(i % percent == 0):
print(f'{i/percent}% complete.')
with open('./temp/'+file, 'r') as fin:
fout.write(fin.read())
如果您继续这样做,还可以尝试一件事:改为以二进制模式打开文件('wb'
和 'rb'
)。 可能 保存无用的文本模式字符层 decoding/encoding。我假设您只想将原始字节粘贴在一起。
把它放在那里以防其他人需要它。
我尽我所能优化了合并代码,但瓶颈仍然是读取或下载 s3 文件,即使使用官方 aws cli 也非常慢。
我找到了一个库 s5cmd,它非常快,因为它充分利用了多处理和多线程,它解决了我的问题。
Link :- https://github.com/peak/s5cmd