在 PySpark/Delta 个数据帧上高效执行
Efficient execution on PySpark/Delta dataframes
在 Databricks 上使用 pyspark/Delta 个湖,我有以下场景:
sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)
analysis_1 = result.groupBy(...).count() # transformation performed here
analysis_2 = result.groupBy(...).count() # transformation performed here
据我了解 Spark with Delta lakes,由于链式执行,result
实际上不是在声明时计算的,而是在使用时计算的。
然而,在这个例子中,它被多次使用,因此最昂贵的转换被计算了多次。
是否可以在代码中的某个点强制执行,例如
sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)
result.force() # transformation performed here??
analysis_1 = result.groupBy(...).count() # quick smaller transformation??
analysis_2 = result.groupBy(...).count() # quick smaller transformation??
我的问题看的满满的,还是说不清楚。但是,如果您是 Spark 的新手,情况可能就是这样。
所以:
关于 .force 的使用,请参阅 https://blog.knoldus.com/getting-lazy-with-scala/。force 将无法在数据集或数据帧上工作。
这与 pyspark 或 Delta Lake 方法有关吗?不不不
analysis_1 = result.groupBy(...).count() # quick smaller transformation??
- 这实际上是一个在最有可能导致洗牌之前进行转换的操作。
所以,我认为您的意思是,正如我们尊敬的 pault 所说,以下内容:
- .cache 或 .persist
我怀疑你需要:
result.cache
这意味着您的 第二个操作 analysis_2 不需要重新计算回到这里显示的来源 a
(2) Spark Jobs
Job 16 View(Stages: 3/3)
Stage 43:
8/8
succeeded / total tasks
Stage 44:
200/200
succeeded / total tasks
Stage 45:
1/1
succeeded / total tasks
Job 17 View(Stages: 2/2, 1 skipped)
Stage 46:
0/8
succeeded / total tasks skipped
Stage 47:
200/200
succeeded / total tasks
Stage 48:
1/1
succeeded / total tasks
随着对 Spark 的改进,shuffle 分区在某些情况下仍然会导致跳过阶段,特别是对于 RDD。对于数据帧,需要缓存才能获得我观察到的跳过阶段效果。
在 Databricks 上使用 pyspark/Delta 个湖,我有以下场景:
sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)
analysis_1 = result.groupBy(...).count() # transformation performed here
analysis_2 = result.groupBy(...).count() # transformation performed here
据我了解 Spark with Delta lakes,由于链式执行,result
实际上不是在声明时计算的,而是在使用时计算的。
然而,在这个例子中,它被多次使用,因此最昂贵的转换被计算了多次。
是否可以在代码中的某个点强制执行,例如
sdf = spark.read.format("delta").table("...")
result = sdf.filter(...).groupBy(...).agg(...)
result.force() # transformation performed here??
analysis_1 = result.groupBy(...).count() # quick smaller transformation??
analysis_2 = result.groupBy(...).count() # quick smaller transformation??
我的问题看的满满的,还是说不清楚。但是,如果您是 Spark 的新手,情况可能就是这样。
所以:
关于 .force 的使用,请参阅 https://blog.knoldus.com/getting-lazy-with-scala/。force 将无法在数据集或数据帧上工作。
这与 pyspark 或 Delta Lake 方法有关吗?不不不
analysis_1 = result.groupBy(...).count() # quick smaller transformation??
- 这实际上是一个在最有可能导致洗牌之前进行转换的操作。
所以,我认为您的意思是,正如我们尊敬的 pault 所说,以下内容:
- .cache 或 .persist
我怀疑你需要:
result.cache
这意味着您的 第二个操作 analysis_2 不需要重新计算回到这里显示的来源 a
(2) Spark Jobs
Job 16 View(Stages: 3/3)
Stage 43:
8/8
succeeded / total tasks
Stage 44:
200/200
succeeded / total tasks
Stage 45:
1/1
succeeded / total tasks
Job 17 View(Stages: 2/2, 1 skipped)
Stage 46:
0/8
succeeded / total tasks skipped
Stage 47:
200/200
succeeded / total tasks
Stage 48:
1/1
succeeded / total tasks
随着对 Spark 的改进,shuffle 分区在某些情况下仍然会导致跳过阶段,特别是对于 RDD。对于数据帧,需要缓存才能获得我观察到的跳过阶段效果。