当连接来自同一父数据帧的两个派生数据帧时,spark DAG 是如何工作的?

How does spark DAG works when joining two derived dataframe from a same parent one?

假设我有某种转换,如下面的代码片段,我想在 spark 中加入两个从同一个父数据帧派生的数据帧。 DAG 将如何针对这些计算进行优化或不进行优化,并且是否会保留任何使用的初始读取值?

val dataFrame = readDataframe() // .persist() ?
val derived1 = dataFrame.transform(/* tranformation1 */)
val derived2 = dataFrame.transform(/* tranformation2 */)
val result = derived1.join(derived2, /* condition*/)
result.show()

persist 在这里没有用,因为惰性评估导致整个代码中没有完成任何实际操作。下面的物理计划表明 persisting 根本没有优化物理计划。

但是,如果您在转换期间调用 .count().show() 之类的东西,那么您会强制 Spark 评估您的查询,而 persist 在这种情况下会有所帮助。

不坚持:

scala> val df = spark.range(10)
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> val df1 = df.transform(x => x.select($"id", $"id" * 2))
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, (id * 2): bigint]

scala> val df2 = df.transform(x => x.select($"id", $"id" + 2))
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, (id + 2): bigint]

scala> val result = df1.join(df2, "id")
result: org.apache.spark.sql.DataFrame = [id: bigint, (id * 2): bigint ... 1 more field]

scala> result.explain()
== Physical Plan ==
*(2) Project [id#8L, (id * 2)#15L, (id + 2)#18L]
+- *(2) BroadcastHashJoin [id#8L], [id#21L], Inner, BuildRight
   :- *(2) Project [id#8L, (id#8L * 2) AS (id * 2)#15L]
   :  +- *(2) Range (0, 10, step=1, splits=24)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#39]
      +- *(1) Project [id#21L, (id#21L + 2) AS (id + 2)#18L]
         +- *(1) Range (0, 10, step=1, splits=24)

坚持:

scala> val df0 = df.persist()
df0: df.type = [id: bigint]

scala> val df1 = df0.transform(x => x.select($"id", $"id" * 2))
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, (id * 2): bigint]

scala> val df2 = df0.transform(x => x.select($"id", $"id" + 2))
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, (id + 2): bigint]

scala> val result = df1.join(df2, "id")
result: org.apache.spark.sql.DataFrame = [id: bigint, (id * 2): bigint ... 1 more field]

scala> result.explain()
== Physical Plan ==
*(2) Project [id#8L, (id * 2)#50L, (id + 2)#53L]
+- *(2) BroadcastHashJoin [id#8L], [id#56L], Inner, BuildRight
   :- *(2) Project [id#8L, (id#8L * 2) AS (id * 2)#50L]
   :  +- *(2) ColumnarToRow
   :     +- InMemoryTableScan [id#8L]
   :           +- InMemoryRelation [id#8L], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                 +- *(1) Range (0, 10, step=1, splits=24)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#100]
      +- *(1) Project [id#56L, (id#56L + 2) AS (id + 2)#53L]
         +- *(1) ColumnarToRow
            +- InMemoryTableScan [id#56L]
                  +- InMemoryRelation [id#56L], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *(1) Range (0, 10, step=1, splits=24)