附加新数据时如何避免从 S3 读取旧文件?
How to avoid reading old files from S3 when appending new data?
2 小时一次,spark 作业 运行 将一些 tgz 文件转换为 parquet。
该作业将新数据附加到 s3 中的现有镶木地板中:
df.write.mode("append").partitionBy("id","day").parquet("s3://myBucket/foo.parquet")
在 spark-submit 输出中,我可以看到大量时间花在阅读旧的 parquet 文件上,例如:
16/11/27 14:06:15 INFO S3NativeFileSystem: Opening 's3://myBucket/foo.parquet/id=123/day=2016-11-26/part-r-00003-b20752e9-5d70-43f5-b8b4-50b5b4d0c7da.snappy.parquet' for reading
16/11/27 14:06:15 INFO S3NativeFileSystem: Stream for key
'foo.parquet/id=123/day=2016-11-26/part-r-00003-e80419de-7019-4859-bbe7-dcd392f6fcd3.snappy.parquet'
seeking to position '149195444'
看起来这个操作每个文件花费不到 1 秒,但是文件的数量随着时间的推移而增加(每次追加都会添加新文件),这让我觉得我的代码将无法扩展。
如果我只需要附加新数据,有什么想法可以避免从 s3 读取旧的 parquet 文件吗?
我使用 EMR 4.8.2 和 DirectParquetOutputCommitter:
sc._jsc.hadoopConfiguration().set('spark.sql.parquet.output.committer.class', 'org.apache.spark.sql.parquet.DirectParquetOutputCommitter')
我通过将数据帧写入 EMR HDFS 然后使用 s3-dist-cp 将镶木地板上传到 S3 解决了这个问题
将其切换为使用动态分区覆盖模式使用:
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
另外,避免 DirectParquetOutputCommitter
,不要修改它 - 使用 EMRFS 文件提交器将在速度方面取得更好的结果。
2 小时一次,spark 作业 运行 将一些 tgz 文件转换为 parquet。 该作业将新数据附加到 s3 中的现有镶木地板中:
df.write.mode("append").partitionBy("id","day").parquet("s3://myBucket/foo.parquet")
在 spark-submit 输出中,我可以看到大量时间花在阅读旧的 parquet 文件上,例如:
16/11/27 14:06:15 INFO S3NativeFileSystem: Opening 's3://myBucket/foo.parquet/id=123/day=2016-11-26/part-r-00003-b20752e9-5d70-43f5-b8b4-50b5b4d0c7da.snappy.parquet' for reading
16/11/27 14:06:15 INFO S3NativeFileSystem: Stream for key 'foo.parquet/id=123/day=2016-11-26/part-r-00003-e80419de-7019-4859-bbe7-dcd392f6fcd3.snappy.parquet' seeking to position '149195444'
看起来这个操作每个文件花费不到 1 秒,但是文件的数量随着时间的推移而增加(每次追加都会添加新文件),这让我觉得我的代码将无法扩展。
如果我只需要附加新数据,有什么想法可以避免从 s3 读取旧的 parquet 文件吗?
我使用 EMR 4.8.2 和 DirectParquetOutputCommitter:
sc._jsc.hadoopConfiguration().set('spark.sql.parquet.output.committer.class', 'org.apache.spark.sql.parquet.DirectParquetOutputCommitter')
我通过将数据帧写入 EMR HDFS 然后使用 s3-dist-cp 将镶木地板上传到 S3 解决了这个问题
将其切换为使用动态分区覆盖模式使用:
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
另外,避免 DirectParquetOutputCommitter
,不要修改它 - 使用 EMRFS 文件提交器将在速度方面取得更好的结果。