从 Spark 将许多文件写入镶木地板 - 缺少一些镶木地板文件

Writing many files to parquet from Spark - Missing some parquet files

我们开发了一个作业,它使用 Spark 2.3 在 Amazon S3 (s3a) 的 parquet 中处理和写入大量文件。每个源文件都应该在 S3 中创建一个不同的分区。代码已经过测试(使用较少的文件)并按预期工作。

然而,在使用真实数据执行后,我们注意到一些文件(总数的一小部分)没有写入 parquet。日志中没有错误或任何奇怪的东西。我们再次测试了丢失文件的代码并且它有效 ¿?。我们想在生产环境中使用代码,但我们需要检测这里的问题是什么。我们正在这样写镶木地板:

dataframe_with_data_to_write.repartition($"field1", $"field2").write.option("compression", "snappy").option("basePath", path_out).partitionBy("field1", "field2", "year", "month", "day").mode(SaveMode.Append).parquet(path_out)

我们使用了推荐的参数:

spark.sparkContext.hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")  
spark.sparkContext.hadoopConfiguration.set("mapreduce.fileoutputcommitter.cleanup-failures.ignored", "true")
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

是否存在使用此参数的已知错误问题?也许与 S3 最终一致性有关?有什么建议吗?

任何帮助将不胜感激。

是的,这是一个已知问题。通过在尝试工作目录中列出输出并重命名到目标目录来提交工作。如果该清单低估了文件:输出丢失。如果该清单列出了不存在的文件,则提交失败。

修复了 ASF Hadoop 版本。

  1. hadoop-2.7-2.8 连接器。写入HDFS,复制文件
  2. Hadoop 2.9-3.0 打开 S3Guard 以获得一致的 S3 列表(为此使用 DynamoDB)
  3. Hadoop 3.1,切换到 the S3A committers,它的设计考虑了一致性和性能问题。来自 netflix 的“staging”在此处使用起来最简单。

进一步阅读:A zero-rename committer

更新 11-01-2019,亚马逊有自己的 ASF 闭源实现 zero rename committer。向 EMR 团队询问他们自己的正确性证明,因为我们其他人无法验证这一点。

2020 年 12 月 11 日更新:Amazon S3 现在完全一致,因此列表将是最新且正确的;不再更新不一致和 404 缓存。

  • v1 提交算法仍然不安全,因为目录重命名是非原子的
  • v2 提交算法总是会出错,因为它会逐个重命名文件
  • 重命名是 S3 上的慢 O(data) 复制操作,因此 window 任务提交期间的失败更大。

您不再有数据丢失的风险,但除了性能糟糕之外,任务提交期间的失败也没有得到妥善处理