在 Spark 中保存中间结果
Saving intermediate result in Spark
我正在使用 Spark SQL 1.6.0 创建处理管道。该管道由 steps/transformations 组成,一步的输出转发到下一步。在最后一步之后,结果 DataFrame 保存在 HDFS 中。我还需要在一些中间步骤中保存结果。执行此操作的代码为:
saveDataFrame(flushPath, flushFormat, isCoalesce, flushMode, previousDataFrame, sqlContext)
previousDataFrame
这里,previousDataFrame是最后一步的结果,saveDataFrame只是将DataFrame保存到给定的位置,然后previousDataFrame将被next steps/transformation使用。最后在最后一步之后,它将保存在 HDFS 中。 saveDataFrame 的代码是:
implicit def saveDataFrame(path: String, format: String, isCoalesce: Boolean, saveMode: SaveMode, dataFrame: DataFrame, sqlContext: SQLContext): Unit = {
val source = if (isCoalesce) dataFrame.coalesce(1) else dataFrame
if (format.equalsIgnoreCase("csv")) {
source
.write
.mode(saveMode)
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.save(path)
}
else if (format.equalsIgnoreCase("parquet") || format.equalsIgnoreCase("json")) {
source
.write
.mode(SaveMode.Overwrite)
.format(format)
.save(path)
}
else {
throw new Exception("%s input format is not supported".format(format))
}}
这很好用,只是 spark 应用程序花费的时间比平时长。如果在 20 分钟内保存中间输出应用程序 运行s,那么使用此代码需要 1 小时。尽管作业和任务根据 Spark UI 在 20 分钟内完成,但 spark 提交过程继续 运行 直到 1 小时。
请帮忙算出结果。我还尝试了以下 2 种可能的解决方案:
- 使用Future创建多线程调用saveDataFrame
- 在保存之前缓存先前的 DataFrame 并将其重新用于下一步。
问题出在 AWS S3 路径上,导致执行延迟。当我开始将输出保存到 HDFS 时,执行时间减少了。
我正在使用 Spark SQL 1.6.0 创建处理管道。该管道由 steps/transformations 组成,一步的输出转发到下一步。在最后一步之后,结果 DataFrame 保存在 HDFS 中。我还需要在一些中间步骤中保存结果。执行此操作的代码为:
saveDataFrame(flushPath, flushFormat, isCoalesce, flushMode, previousDataFrame, sqlContext)
previousDataFrame
这里,previousDataFrame是最后一步的结果,saveDataFrame只是将DataFrame保存到给定的位置,然后previousDataFrame将被next steps/transformation使用。最后在最后一步之后,它将保存在 HDFS 中。 saveDataFrame 的代码是:
implicit def saveDataFrame(path: String, format: String, isCoalesce: Boolean, saveMode: SaveMode, dataFrame: DataFrame, sqlContext: SQLContext): Unit = {
val source = if (isCoalesce) dataFrame.coalesce(1) else dataFrame
if (format.equalsIgnoreCase("csv")) {
source
.write
.mode(saveMode)
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.save(path)
}
else if (format.equalsIgnoreCase("parquet") || format.equalsIgnoreCase("json")) {
source
.write
.mode(SaveMode.Overwrite)
.format(format)
.save(path)
}
else {
throw new Exception("%s input format is not supported".format(format))
}}
这很好用,只是 spark 应用程序花费的时间比平时长。如果在 20 分钟内保存中间输出应用程序 运行s,那么使用此代码需要 1 小时。尽管作业和任务根据 Spark UI 在 20 分钟内完成,但 spark 提交过程继续 运行 直到 1 小时。
请帮忙算出结果。我还尝试了以下 2 种可能的解决方案:
- 使用Future创建多线程调用saveDataFrame
- 在保存之前缓存先前的 DataFrame 并将其重新用于下一步。
问题出在 AWS S3 路径上,导致执行延迟。当我开始将输出保存到 HDFS 时,执行时间减少了。