DAG 中的 ExternalRDDScan 是什么?
What is ExternalRDDScan in the DAG?
DAG中ExternalRDDScan是什么意思?
整个互联网都没有解释。
在source的基础上,ExternalRDDScan
是将现有的任意对象的RDD转换为InternalRow
的数据集的一种表示,即创建一个DataFrame
。验证一下我们的理解是否正确:
scala> import spark.implicits._
import spark.implicits._
scala> val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.toDF().explain()
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
ExternalRDD 是查询执行计划中 DataFrame/Dataset(尽管并非在所有情况下)的逻辑表示,即在由 spark 创建的 DAG 中。
已创建外部 RDD
- 当您从 RDD 创建 DataFrame 时(即使用 createDataFrame()、toDF())
- 当您从 RDD 创建数据集时(即使用 createDataSet()、toDS())
在运行的时候,当ExternalRDD要加载到内存中时,会进行一次扫描操作,用ExternalRDDScan表示(内部扫描策略解析为ExternalRDDScanExec)。看下面的例子:
scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> sampleRDD.toDF.queryExecution
res0: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]
== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]
== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan[obj#1]
You can see that in the query execution plan, the DataFrame object is
represented by ExternalRDD and the physical plan contains a scan
operation which is resolved to ExternalRDDScan (ExternalRDDScanExec)
during its execution.
spark 数据集也是如此。
scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> sampleRDD.toDS.queryExecution.logical
res9: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#23]
+- ExternalRDD [obj#22]
scala> spark.createDataset(sampleRDD).queryExecution.logical
res18: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#39]
+- ExternalRDD [obj#38]
上面的例子是运行在spark版本2.4.2
参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-LogicalPlan-ExternalRDD.html
DAG中ExternalRDDScan是什么意思?
整个互联网都没有解释。
在source的基础上,ExternalRDDScan
是将现有的任意对象的RDD转换为InternalRow
的数据集的一种表示,即创建一个DataFrame
。验证一下我们的理解是否正确:
scala> import spark.implicits._
import spark.implicits._
scala> val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.toDF().explain()
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
ExternalRDD 是查询执行计划中 DataFrame/Dataset(尽管并非在所有情况下)的逻辑表示,即在由 spark 创建的 DAG 中。
已创建外部 RDD
- 当您从 RDD 创建 DataFrame 时(即使用 createDataFrame()、toDF())
- 当您从 RDD 创建数据集时(即使用 createDataSet()、toDS())
在运行的时候,当ExternalRDD要加载到内存中时,会进行一次扫描操作,用ExternalRDDScan表示(内部扫描策略解析为ExternalRDDScanExec)。看下面的例子:
scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> sampleRDD.toDF.queryExecution
res0: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]
== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]
== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan[obj#1]
You can see that in the query execution plan, the DataFrame object is represented by ExternalRDD and the physical plan contains a scan operation which is resolved to ExternalRDDScan (ExternalRDDScanExec) during its execution.
spark 数据集也是如此。
scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> sampleRDD.toDS.queryExecution.logical
res9: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#23]
+- ExternalRDD [obj#22]
scala> spark.createDataset(sampleRDD).queryExecution.logical
res18: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#39]
+- ExternalRDD [obj#38]
上面的例子是运行在spark版本2.4.2
参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-LogicalPlan-ExternalRDD.html