Spark 中长谱系 (DAG) 的问题
Issues with long lineages (DAG) in Spark
我们通常使用Spark作为存储在S3或HDFS上的数据的处理引擎。我们使用 Databricks 和 EMR 平台。
我经常面临的问题之一是当任务规模增大时,工作性能会严重下降。例如,假设我从五个具有不同转换级别(过滤、分解、连接等)的表中读取数据,从这些转换中合并数据子集,然后进行进一步处理(例如,根据以下条件删除一些行需要窗口函数等),然后是其他一些处理阶段,最后将最终输出保存到目标 s3 路径。如果我们 运行 没有它,这项工作需要很长时间。但是,如果我们将临时中间数据帧保存(暂存)到 S3 并将保存的(在 S3 上)数据帧用于下一步的查询,则作业完成得更快。有没有人有类似的经历?有没有比检查点更好的方法来处理这种长任务谱系?
更奇怪的是,对于较长的谱系,spark 会抛出一个预期的错误,如未找到列,而如果临时暂存中间结果,则相同的代码可以正常工作。
通过保存数据框或使用检查点写入中间数据是修复它的唯一方法。您可能 运行 遇到优化器需要很长时间才能生成计划的问题。 quickest/most 解决此问题的有效方法是使用 localCheckpoint。这在本地实现了一个检查点。
val df = df.localCheckpoint()
我们通常使用Spark作为存储在S3或HDFS上的数据的处理引擎。我们使用 Databricks 和 EMR 平台。 我经常面临的问题之一是当任务规模增大时,工作性能会严重下降。例如,假设我从五个具有不同转换级别(过滤、分解、连接等)的表中读取数据,从这些转换中合并数据子集,然后进行进一步处理(例如,根据以下条件删除一些行需要窗口函数等),然后是其他一些处理阶段,最后将最终输出保存到目标 s3 路径。如果我们 运行 没有它,这项工作需要很长时间。但是,如果我们将临时中间数据帧保存(暂存)到 S3 并将保存的(在 S3 上)数据帧用于下一步的查询,则作业完成得更快。有没有人有类似的经历?有没有比检查点更好的方法来处理这种长任务谱系?
更奇怪的是,对于较长的谱系,spark 会抛出一个预期的错误,如未找到列,而如果临时暂存中间结果,则相同的代码可以正常工作。
通过保存数据框或使用检查点写入中间数据是修复它的唯一方法。您可能 运行 遇到优化器需要很长时间才能生成计划的问题。 quickest/most 解决此问题的有效方法是使用 localCheckpoint。这在本地实现了一个检查点。
val df = df.localCheckpoint()