旧数据框的 spark select 列 returns 引用

spark select column returns reference of old dataframe

我使用以下代码:

random = [("ABC",xx, 1), 
          ("DEF",yy,1), 
          ("GHI",zz, 0) 
         ]
randomColumns = ["name","id", "male"]
randomDF = spark.createDataFrame(data=random, schema = randomColumns)
test_df = randomDF.select("name", "id")
test_df.filter(f.col("male") == '1').show()

从上面的代码来看,我预计它会导致错误,因为对于 test_df 我没有 select 来自原始数据帧的男性列。令人惊讶的是,上面的查询运行得很好,没有任何错误,并输出以下内容:

+---------+-------+
|name     |     id|
+---------+-------+
|      abc|     xx|
|      def|     yy|
+---------+-------+

我想了解 spark 背后的逻辑。根据 spark 文档 Select returns 一个新的数据框。那为什么它仍然能够使用父数据框中的男性列。

这是Spark生成的DAG造成的。一些运算符(或 transformers)是惰性执行的,因此它们为 Spark 优化 DAG 铺平了道路。

在这个例子中,有两个主要步骤:首先是select(或者SQL行话中的project),然后是filter。但实际上,执行的时候,先filter,再select,因为效率更高

你可以通过explain()方法验证这个结论:

test_df.filter(f.col("flag") == '1').explain()

它将输出:

== Physical Plan ==
*(1) Project [dept_name#0, dept_id#1L]
+- *(1) Filter (isnotnull(flag#2L) AND (flag#2L = 1))
   +- *(1) Scan ExistingRDD[dept_name#0,dept_id#1L,flag#2L]

添加到@chenzhongpu 的回答中,请注意,如果您在 test_df 之上定义临时视图,查询将失败:

test_df.createOrReplaceTempView("test_df")
spark.sql("select * from test_df where flag = 1").show()
_Traceback (most recent call last): ...
:
pyspark.sql.utils.AnalysisException: u"cannot resolve '`flag`' given input columns: [test_df.dept, test_df.id]; line 1 pos 24;
'Project [*]
 +- 'Filter ('flag = 1)
   +- SubqueryAlias `test_df`
      +- Project [dept#0, id#2L]
         +- LogicalRDD [dept#0, flag#1L, id#2L], false
 _

...因为 select(执行计划中的=Project 节点)将先于过滤器(尝试通过 where 子句)。