附加新数据时如何避免从 S3 读取旧文件?

How to avoid reading old files from S3 when appending new data?

2 小时一次,spark 作业 运行 将一些 tgz 文件转换为 parquet。 该作业将新数据附加到 s3 中的现有镶木地板中:

df.write.mode("append").partitionBy("id","day").parquet("s3://myBucket/foo.parquet")

在 spark-submit 输出中,我可以看到大量时间花在阅读旧的 parquet 文件上,例如:

16/11/27 14:06:15 INFO S3NativeFileSystem: Opening 's3://myBucket/foo.parquet/id=123/day=2016-11-26/part-r-00003-b20752e9-5d70-43f5-b8b4-50b5b4d0c7da.snappy.parquet' for reading

16/11/27 14:06:15 INFO S3NativeFileSystem: Stream for key 'foo.parquet/id=123/day=2016-11-26/part-r-00003-e80419de-7019-4859-bbe7-dcd392f6fcd3.snappy.parquet' seeking to position '149195444'

看起来这个操作每个文件花费不到 1 秒,但是文件的数量随着时间的推移而增加(每次追加都会添加新文件),这让我觉得我的代码将无法扩展。

如果我只需要附加新数据,有什么想法可以避免从 s3 读取旧的 parquet 文件吗?

我使用 EMR 4.8.2 和 DirectParquetOutputCommitter:

sc._jsc.hadoopConfiguration().set('spark.sql.parquet.output.committer.class', 'org.apache.spark.sql.parquet.DirectParquetOutputCommitter')

我通过将数据帧写入 EMR HDFS 然后使用 s3-dist-cp 将镶木地板上传到 S3 解决了这个问题

将其切换为使用动态分区覆盖模式使用:

.config("spark.sql.sources.partitionOverwriteMode", "dynamic")

另外,避免 DirectParquetOutputCommitter,不要修改它 - 使用 EMRFS 文件提交器将在速度方面取得更好的结果。