如何正确使用 AWS EMR (Pyspark) 执行增量加载?
How to perform incremental load using AWS EMR (Pyspark) the right way?
我的所有数据都在 S3 位置 s3://sample/input_data
我通过部署 AWS EMR 和使用 PySpark 进行 ETL。
PySpark 脚本非常简单。
- 我加载
s3://sample/input_data
作为 spark 数据帧。
- 按一列划分。
- 将数据帧保存为 Parquet 文件,在 'append' 模式下写入选项到 S3 位置
s3://sample/output_data
- 然后将
s3://sample/input_data
中的所有文件复制到s3://sample/archive_data
并删除s3://sample/input_data
中的所有数据
所以当新数据进入s3://sample/input_data
时,它只处理新文件并将其保存在s3://sample/output_data
分区。
是否有任何内置闩锁 AWS EMR 提供我应该知道我可以使用它而不是执行我的 PySpark 脚本的最后一步?
您可以将 Delta Lake 用于这些目的,或者按 s3://sample/input_data/year=2021/month=11/day=11/
之类的时间间隔对输入目录进行分区,以便您只处理该时间间隔内的数据。
您可以在电子病历中使用步进功能。
罐子将是脚本-runner.jar
s3://.elasticmapreduce/libs/script-runner/script-runner.jar 其中是您的 Amazon EMR 集群所在的区域。您可以使用脚本-runner.jar 到 运行 保存在本地或集群上的 Amazon S3 上的脚本
您必须将 shell 脚本指定为 运行。在你的情况下 cp 命令
我的所有数据都在 S3 位置 s3://sample/input_data
我通过部署 AWS EMR 和使用 PySpark 进行 ETL。
PySpark 脚本非常简单。
- 我加载
s3://sample/input_data
作为 spark 数据帧。 - 按一列划分。
- 将数据帧保存为 Parquet 文件,在 'append' 模式下写入选项到 S3 位置
s3://sample/output_data
- 然后将
s3://sample/input_data
中的所有文件复制到s3://sample/archive_data
并删除s3://sample/input_data
中的所有数据
所以当新数据进入s3://sample/input_data
时,它只处理新文件并将其保存在s3://sample/output_data
分区。
是否有任何内置闩锁 AWS EMR 提供我应该知道我可以使用它而不是执行我的 PySpark 脚本的最后一步?
您可以将 Delta Lake 用于这些目的,或者按 s3://sample/input_data/year=2021/month=11/day=11/
之类的时间间隔对输入目录进行分区,以便您只处理该时间间隔内的数据。
您可以在电子病历中使用步进功能。 罐子将是脚本-runner.jar s3://.elasticmapreduce/libs/script-runner/script-runner.jar 其中是您的 Amazon EMR 集群所在的区域。您可以使用脚本-runner.jar 到 运行 保存在本地或集群上的 Amazon S3 上的脚本
您必须将 shell 脚本指定为 运行。在你的情况下 cp 命令