在 PySpark/Delta 个数据帧上高效执行

Efficient execution on PySpark/Delta dataframes

在 Databricks 上使用 pyspark/Delta 个湖,我有以下场景:

sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)

analysis_1 = result.groupBy(...).count() # transformation performed here
analysis_2 = result.groupBy(...).count() # transformation performed here

据我了解 Spark with Delta lakes,由于链式执行,result 实际上不是在声明时计算的,而是在使用时计算的。

然而,在这个例子中,它被多次使用,因此最昂贵的转换被计算了多次。

是否可以在代码中的某个点强制执行,例如

sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)
result.force() # transformation performed here??

analysis_1 = result.groupBy(...).count() # quick smaller transformation??
analysis_2 = result.groupBy(...).count() # quick smaller transformation??

我的问题看的满满的,还是说不清楚。但是,如果您是 Spark 的新手,情况可能就是这样。

所以:

关于 .force 的使用,请参阅 https://blog.knoldus.com/getting-lazy-with-scala/。force 将无法在数据集或数据帧上工作。

这与 pyspark 或 Delta Lake 方法有关吗?不不不

analysis_1 = result.groupBy(...).count() # quick smaller transformation?? 
  • 这实际上是一个在最有可能导致洗牌之前进行转换的操作。

所以,我认为您的意思是,正如我们尊敬的 pault 所说,以下内容:

  • .cache 或 .persist

我怀疑你需要:

result.cache 

这意味着您的 第二个操作 analysis_2 不需要重新计算回到这里显示的来源 a

(2) Spark Jobs
Job 16 View(Stages: 3/3)
Stage 43: 
8/8
succeeded / total tasks 
Stage 44: 
200/200
succeeded / total tasks 
Stage 45:   
1/1
succeeded / total tasks 
Job 17 View(Stages: 2/2, 1 skipped)
Stage 46: 
0/8
succeeded / total tasks skipped
Stage 47: 
200/200
succeeded / total tasks 
Stage 48:   
1/1
succeeded / total tasks 

随着对 Spark 的改进,shuffle 分区在某些情况下仍然会导致跳过阶段,特别是对于 RDD。对于数据帧,需要缓存才能获得我观察到的跳过阶段效果。