合并每小时生成的百万个 S3 文件

Merge million S3 files generated hourly

我每小时创建数百万个文件。每个文件有一行数据。这些文件需要合并成一个文件。

我尝试过以下方式:-

  1. 使用 aws s3 cp 下载文件一小时。
  2. 使用bash 命令合并文件。 或者
  3. 使用 python 脚本合并文件。

Airflow on Kubernetes (EKS) 中 运行 这个每小时的工作。这需要一个多小时才能完成,并且正在积压。另一个问题是它经常导致 EC2 节点由于 CPU 和内存使用率过高而停止响应。 运行完成这项工作最有效的方法是什么?

python脚本供参考:-

from os import listdir
import sys
# from tqdm import tqdm

files = listdir('./temp/')
dest = sys.argv[1]

data = []

tot_len = len(files)
percent = tot_len//100

for i, file in enumerate(files):
    if(i % percent == 0):
        print(f'{i/percent}% complete.')
    with open('./temp/'+file, 'r') as f:
        d = f.read()
        data.append(d)

result = '\n'.join(data)

with open(dest, 'w') as f:
    f.write(result)

可扩展且可靠的方法是:

  • 将 Amazon S3 存储桶配置为 在新文件到达时触发 AWS Lambda 函数
  • 在 AWS Lambda 函数中,读取文件内容并将其发送到 Amazon Kinesis Firehose 流。然后,删除输入文件。
  • 配置 Amazon Kinesis Firehose 流以缓冲输入数据并根据时间段(最多 15 分钟)或数据大小(最多 128MB)输出新文件

参见:Amazon Kinesis Data Firehose Data Delivery - Amazon Kinesis Data Firehose

这将不会每小时生成文件 -- 文件数量将取决于传入数据的大小。

如果您需要创建每小时文件,您可以考虑在 Firehose 的输出文件上使用 Amazon Athena。 Athena 允许您 运行 SQL 查询存储在 Amazon S3 中的文件。因此,如果输入文件包含日期列,它可以 select 仅显示特定时间的数据。 (您可以为此编写 Lambda 函数代码,添加一个日期列。)

我希望您应该非常认真地考虑遵循您得到的特定于 AWS 的答案中的想法。我会将此回复添加为评论,但无法在评论中连贯地显示缩进代码。

关于您的 Python 脚本,您正在构建一个巨大的字符串,其字符数等于所有输入文件中的字符总数。所以当然内存使用量至少会增长那么大。

读取文件内容后立即写出文件内容会占用更少的内存(请注意,此代码未经测试 - 可能有错别字,我不知道):

with open(dest, 'w') as fout:
    for i, file in enumerate(files):
        if(i % percent == 0):
            print(f'{i/percent}% complete.')
        with open('./temp/'+file, 'r') as fin:
            fout.write(fin.read())

如果您继续这样做,还可以尝试一件事:改为以二进制模式打开文件('wb''rb')。 可能 保存无用的文本模式字符层 decoding/encoding。我假设您只想将原始字节粘贴在一起。

把它放在那里以防其他人需要它。

我尽我所能优化了合并代码,但瓶颈仍然是读取或下载 s3 文件,即使使用官方 aws cli 也非常慢。

我找到了一个库 s5cmd,它非常快,因为它充分利用了多处理和多线程,它解决了我的问题。

Link :- https://github.com/peak/s5cmd