s3 上的 Spark Dataset Parquet 分区创建临时文件夹

Spark Dataset Parquet partition on s3 creating temporary folder

Spark(version=2.2.0) 没有DirectParquetOutputCommitter。作为替代方案,我可以使用

dataset
    .option("mapreduce.fileoutputcommitter.algorithm.version", "2")//magic here
    .parquet("s3a://...")

避免在 S3 上创建 _temporary 文件夹。

一切正常,直到我为我的数据集设置 partitionBy

dataset
    .partitionBy("a", "b")
    .option("mapreduce.fileoutputcommitter.algorithm.version", "2")//magic stop working creating _temporary on S3
    .parquet("s3a://...")

也尝试添加但没有成功

spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

在 Spark 数据集上使用 partitionBy,它将创建 _temporary 并移动文件,这将成为一个非常缓慢的操作。

有任何替代配置或缺少配置吗?

备选方案(按推荐顺序 + 易用性 - 最佳):

  1. 使用 Netflix 的 S3Committer:https://github.com/rdblue/s3committer/
  2. 写入 HDFS,然后复制到 S3(例如通过 s3distcp)
  3. 不要使用 partitionBy,而是迭代所有分区排列并将结果动态写入每个分区目录
  4. 编写自定义文件提交器

Hadoop 3.1 的 s3a 将内置一个零重命名提交程序,(va HADOOP-13786)。在那之前,您可以使用它的前身,is from netflix

请注意,"algorithm 2" 并不是消除 _temp 目录的神奇步骤,只是在提交单个任务时将任务输出直接重命名为目标。如果目录列表中存在延迟一致性并且仍然是 O(data),仍然容易出错。您不能安全地将 v1 或 v2 提交器直接与 S3 一起使用,而不是与 Hadoop 中附带的 S3A 连接器一起使用 2.x