DAG 中的 ExternalRDDScan 是什么?

What is ExternalRDDScan in the DAG?

DAG中ExternalRDDScan是什么意思?

整个互联网都没有解释。

source的基础上,ExternalRDDScan是将现有的任意对象的RDD转换为InternalRow的数据集的一种表示,即创建一个DataFrame。验证一下我们的理解是否正确:

scala> import spark.implicits._
import spark.implicits._

scala> val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.toDF().explain()
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]

ExternalRDD 是查询执行计划中 DataFrame/Dataset(尽管并非在所有情况下)的逻辑表示,即在由 spark 创建的 DAG 中。

已创建外部 RDD

  1. 当您从 RDD 创建 DataFrame 时(即使用 createDataFrame()、toDF()
  2. 当您从 RDD 创建数据集时(即使用 createDataSet()、toDS()

在运行的时候,当ExternalRDD要加载到内存中时,会进行一次扫描操作,用ExternalRDDScan表示(内部扫描策略解析为ExternalRDDScanExec)。看下面的例子:

scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> sampleRDD.toDF.queryExecution
res0: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan[obj#1]

You can see that in the query execution plan, the DataFrame object is represented by ExternalRDD and the physical plan contains a scan operation which is resolved to ExternalRDDScan (ExternalRDDScanExec) during its execution.

spark 数据集也是如此。

scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> sampleRDD.toDS.queryExecution.logical
res9: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#23]
+- ExternalRDD [obj#22]

scala> spark.createDataset(sampleRDD).queryExecution.logical
res18: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#39]
+- ExternalRDD [obj#38]

上面的例子是运行在spark版本2.4.2

参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-LogicalPlan-ExternalRDD.html