将大文件写入 S3 的最佳方法是什么?

What's the best way to write a big file to S3?

我正在使用 zeppelin 和 spark,我想从 S3 中获取一个 2TB 的文件并在 Spark 中对其进行 运行 转换,然后将其发送到 S3 以便我可以使用Jupyter Notebook 中的文件。转换非常简单。

我正在将文件作为镶木地板文件读取。我想大约是2TB,但我不确定如何验证。

大约10M行5列,相当大了。

我试过my_table.write.parquet(s3path),我试过my_table.write.option("maxRecordsPerFile", 200000).parquet(s3path)。我如何想出正确的方法来编写一个大的 parquet 文件?

这些是您可以考虑的要点...

1) maxRecordsPerFile 设置:

my_table.write.parquet(s3path)

Spark 为每个任务写出一个文件。

保存的文件数=保存的RDD/Dataframe的分区数。因此,这可能会导致文件大得离谱(当然你可以重新分区你的数据并保存重新分区意味着跨网络随机播放数据)。

限制每个文件的记录数

my_table.write.option("maxRecordsPerFile", numberOfRecordsPerFile..yourwish).parquet(s3path)

它可以避免生成巨大的文件。

2) 如果您使用的是 AWS Emr (Emrfs),这可能是您可以考虑的要点之一。

emr-spark-s3-optimized-committer

当未使用 EMRFS S3 优化提交器时:

  • 使用 S3A 文件系统时。
  • 当使用 Parquet 以外的输出格式时,例如 ORC 或文本。

3) 使用压缩技术、算法版本和其他 spark 配置:

.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 2)
.config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", true)
.config("spark.hadoop.parquet.enable.summary-metadata", false)
.config("spark.sql.parquet.mergeSchema", false)
.config("spark.sql.parquet.filterPushdown", true) // for reading purpose 
.config("mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.sql.parquet.compression.codec", "snappy")
.getOrCreate()

4) 如果您使用的是 s3a,则可以快速上传和其他道具:

  .config("spark.hadoop.fs.s3a.fast.upload","true")
  .config("spark.hadoop.fs.s3a.fast.upload","true")
  .config("spark.hadoop.fs.s3a.connection.timeout","100000")
  .config("spark.hadoop.fs.s3a.attempts.maximum","10")
  .config("spark.hadoop.fs.s3a.fast.upload","true")
  .config("spark.hadoop.fs.s3a.fast.upload.buffer","bytebuffer")
  .config("spark.hadoop.fs.s3a.fast.upload.active.blocks","4")
  .config("fs.s3a.connection.ssl.enabled", "true")
  1. S3a 连接器将增量写入块,但 hadoop-2 中随 spark 一起提供的(过时的)版本。7.x 处理得不是很好。如果可以,请将 所有 hadoop- Jars 更新为 2.8.5 或 2.9.x。
  2. 选项"fs.s3a.multipart.size 控制块的大小。有 10K 块的限制,因此您可以上传的最大文件是该大小 * 10,000。对于非常大的文件,使用比默认值“64M”更大的数字