在 Spark 中保存中间结果

Saving intermediate result in Spark

我正在使用 Spark SQL 1.6.0 创建处理管道。该管道由 steps/transformations 组成,一步的输出转发到下一步。在最后一步之后,结果 DataFrame 保存在 HDFS 中。我还需要在一些中间步骤中保存结果。执行此操作的代码为:

saveDataFrame(flushPath, flushFormat, isCoalesce, flushMode, previousDataFrame, sqlContext)
previousDataFrame

这里,previousDataFrame是最后一步的结果,saveDataFrame只是将DataFrame保存到给定的位置,然后previousDataFrame将被next steps/transformation使用。最后在最后一步之后,它将保存在 HDFS 中。 saveDataFrame 的代码是:

implicit def saveDataFrame(path: String, format: String, isCoalesce: Boolean, saveMode: SaveMode, dataFrame: DataFrame, sqlContext: SQLContext): Unit = {
val source = if (isCoalesce) dataFrame.coalesce(1) else dataFrame
if (format.equalsIgnoreCase("csv")) {
  source
    .write
    .mode(saveMode)
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .save(path)
}
else if (format.equalsIgnoreCase("parquet") || format.equalsIgnoreCase("json")) {
  source
    .write
    .mode(SaveMode.Overwrite)
    .format(format)
    .save(path)
}
else {
  throw new Exception("%s input format is not supported".format(format))
}}

这很好用,只是 spark 应用程序花费的时间比平时长。如果在 20 分钟内保存中间输出应用程序 运行s,那么使用此代码需要 1 小时。尽管作业和任务根据 Spark UI 在 20 分钟内完成,但 spark 提交过程继续 运行 直到 1 小时。

请帮忙算出结果。我还尝试了以下 2 种可能的解决方案:

问题出在 AWS S3 路径上,导致执行延迟。当我开始将输出保存到 HDFS 时,执行时间减少了。