为什么 persist() 和 cache() 方法会缩短 Spark 中的 DataFrame 计划?
Why do the persist() and cache() methods shorten DataFrame plan in Spark?
我正在使用 spark 版本 3.0.1。我正在生成一个大数据框。在最后的计算中,我将数据帧计划保存为 json 格式。我需要他。
但是有一个问题。如果我保留一个 DataFrame,那么它的 json 格式的计划将被完全截断。也就是说,所有数据沿袭都消失了。
比如我是这样做的:
val myDf: DataFrame = ???
val myPersistDf = myDf.persist
//toJSON method cuts down my plan
val jsonPlan = myPersistDf.queryExecution.optimizedPlan.toJSON
因此,只保留有关当前列的信息。
但是比如你用的是3.1.2版本的spark,那么就没有这个问题。也就是说,计划没有被削减。
另外值得一提的是,如果不调用toJSON方法,那么计划就不切了:
// Plan is not being cut.
val textPlan = myPersistDf.queryExecution.optimizedPlan.toString
我做了一个小测试项目来确保这一点:
https://github.com/MinorityMeaning/CutPlanDataFrame
请帮我想想办法。
我需要获得 json 格式的完整计划。
UPD(1):
现在我正在尝试将每个节点分别转换为 json。现在还不是很完美,但我认为我们需要朝这个方向努力。
val jsonPlan = s"[${getJson(result_df.queryExecution.optimizedPlan).mkString(",")}]"
def getJson(lp: TreeNode[_]): Seq[String] = {
val children = (lp.innerChildren ++ lp.children.map(c => c.asInstanceOf[TreeNode[_]])).distinct
JsonMethods.compact(JsonMethods.render(JsonMethods.parse(lp.toJSON)(0))) +:
getJson(t.asInstanceOf[TreeNode[_]])))
children.flatMap(t => getJson(t))
}
UPD(2):
好的,这就是我最终解决这个问题的方法。
我从 github 下载了 spark 3.0.1。然后用spark 3.1.2的文件替换了这个项目中的TreeNode class。重新编译了项目。
结果收到了一个包裹spark-catalyst_2.12-3.0.1.jar
取代了现有的原包装。
没有切换到另一个版本的 spark 的选项。我还没有找到任何其他解决问题的方法。
谢谢各位指点。您的建议很有帮助。
好吧,你已经有了答案。
TLDR:升级。
您可以,可以,查看逻辑计划代码中的 github repo,并寻找两者之间逻辑计划计算方式的差异。 3.0.1 和 3.1.2。 (或者查看 persist 的代码,看看它是如何更改的。)然后您可以将补丁返回 3.0.1。但是您仍然需要构建一个新版本的 spark,然后部署它以便返回计划。但是如果你正在做所有这些工作,为什么不升级到 3.1.2 如果你知道它有效呢? (或者更高版本的 Spark?)
(您必须依赖于仅与 3.0.1 兼容的 sub-component?)
您可以挑选以下 2 个提交到 spark 3.0.1 来解决这个问题。
* 1603775934 - [SPARK-35411][SQL][FOLLOWUP] Handle Currying Product while serializing TreeNode to JSON (8 months ago) <Tengfei Huang>
* 9804f07c17 - [SPARK-35411][SQL] Add essential information while serializing TreeNode to json (9 months ago) <Tengfei Huang>
我正在使用 spark 版本 3.0.1。我正在生成一个大数据框。在最后的计算中,我将数据帧计划保存为 json 格式。我需要他。 但是有一个问题。如果我保留一个 DataFrame,那么它的 json 格式的计划将被完全截断。也就是说,所有数据沿袭都消失了。
比如我是这样做的:
val myDf: DataFrame = ???
val myPersistDf = myDf.persist
//toJSON method cuts down my plan
val jsonPlan = myPersistDf.queryExecution.optimizedPlan.toJSON
因此,只保留有关当前列的信息。 但是比如你用的是3.1.2版本的spark,那么就没有这个问题。也就是说,计划没有被削减。 另外值得一提的是,如果不调用toJSON方法,那么计划就不切了:
// Plan is not being cut.
val textPlan = myPersistDf.queryExecution.optimizedPlan.toString
我做了一个小测试项目来确保这一点: https://github.com/MinorityMeaning/CutPlanDataFrame
请帮我想想办法。 我需要获得 json 格式的完整计划。
UPD(1):
现在我正在尝试将每个节点分别转换为 json。现在还不是很完美,但我认为我们需要朝这个方向努力。
val jsonPlan = s"[${getJson(result_df.queryExecution.optimizedPlan).mkString(",")}]"
def getJson(lp: TreeNode[_]): Seq[String] = {
val children = (lp.innerChildren ++ lp.children.map(c => c.asInstanceOf[TreeNode[_]])).distinct
JsonMethods.compact(JsonMethods.render(JsonMethods.parse(lp.toJSON)(0))) +:
getJson(t.asInstanceOf[TreeNode[_]])))
children.flatMap(t => getJson(t))
}
UPD(2):
好的,这就是我最终解决这个问题的方法。 我从 github 下载了 spark 3.0.1。然后用spark 3.1.2的文件替换了这个项目中的TreeNode class。重新编译了项目。 结果收到了一个包裹spark-catalyst_2.12-3.0.1.jar 取代了现有的原包装。
没有切换到另一个版本的 spark 的选项。我还没有找到任何其他解决问题的方法。 谢谢各位指点。您的建议很有帮助。
好吧,你已经有了答案。
TLDR:升级。
您可以,可以,查看逻辑计划代码中的 github repo,并寻找两者之间逻辑计划计算方式的差异。 3.0.1 和 3.1.2。 (或者查看 persist 的代码,看看它是如何更改的。)然后您可以将补丁返回 3.0.1。但是您仍然需要构建一个新版本的 spark,然后部署它以便返回计划。但是如果你正在做所有这些工作,为什么不升级到 3.1.2 如果你知道它有效呢? (或者更高版本的 Spark?)
(您必须依赖于仅与 3.0.1 兼容的 sub-component?)
您可以挑选以下 2 个提交到 spark 3.0.1 来解决这个问题。
* 1603775934 - [SPARK-35411][SQL][FOLLOWUP] Handle Currying Product while serializing TreeNode to JSON (8 months ago) <Tengfei Huang>
* 9804f07c17 - [SPARK-35411][SQL] Add essential information while serializing TreeNode to json (9 months ago) <Tengfei Huang>