旧数据框的 spark select 列 returns 引用
spark select column returns reference of old dataframe
我使用以下代码:
random = [("ABC",xx, 1),
("DEF",yy,1),
("GHI",zz, 0)
]
randomColumns = ["name","id", "male"]
randomDF = spark.createDataFrame(data=random, schema = randomColumns)
test_df = randomDF.select("name", "id")
test_df.filter(f.col("male") == '1').show()
从上面的代码来看,我预计它会导致错误,因为对于 test_df 我没有 select 来自原始数据帧的男性列。令人惊讶的是,上面的查询运行得很好,没有任何错误,并输出以下内容:
+---------+-------+
|name | id|
+---------+-------+
| abc| xx|
| def| yy|
+---------+-------+
我想了解 spark 背后的逻辑。根据 spark 文档 Select returns 一个新的数据框。那为什么它仍然能够使用父数据框中的男性列。
这是Spark生成的DAG造成的。一些运算符(或 transformers
)是惰性执行的,因此它们为 Spark 优化 DAG 铺平了道路。
在这个例子中,有两个主要步骤:首先是select
(或者SQL行话中的project
),然后是filter
。但实际上,执行的时候,先filter
,再select
,因为效率更高
你可以通过explain()
方法验证这个结论:
test_df.filter(f.col("flag") == '1').explain()
它将输出:
== Physical Plan ==
*(1) Project [dept_name#0, dept_id#1L]
+- *(1) Filter (isnotnull(flag#2L) AND (flag#2L = 1))
+- *(1) Scan ExistingRDD[dept_name#0,dept_id#1L,flag#2L]
添加到@chenzhongpu 的回答中,请注意,如果您在 test_df
之上定义临时视图,查询将失败:
test_df.createOrReplaceTempView("test_df")
spark.sql("select * from test_df where flag = 1").show()
_Traceback (most recent call last): ...
:
pyspark.sql.utils.AnalysisException: u"cannot resolve '`flag`' given input columns: [test_df.dept, test_df.id]; line 1 pos 24;
'Project [*]
+- 'Filter ('flag = 1)
+- SubqueryAlias `test_df`
+- Project [dept#0, id#2L]
+- LogicalRDD [dept#0, flag#1L, id#2L], false
_
...因为 select
(执行计划中的=Project
节点)将先于过滤器(尝试通过 where
子句)。
我使用以下代码:
random = [("ABC",xx, 1),
("DEF",yy,1),
("GHI",zz, 0)
]
randomColumns = ["name","id", "male"]
randomDF = spark.createDataFrame(data=random, schema = randomColumns)
test_df = randomDF.select("name", "id")
test_df.filter(f.col("male") == '1').show()
从上面的代码来看,我预计它会导致错误,因为对于 test_df 我没有 select 来自原始数据帧的男性列。令人惊讶的是,上面的查询运行得很好,没有任何错误,并输出以下内容:
+---------+-------+
|name | id|
+---------+-------+
| abc| xx|
| def| yy|
+---------+-------+
我想了解 spark 背后的逻辑。根据 spark 文档 Select returns 一个新的数据框。那为什么它仍然能够使用父数据框中的男性列。
这是Spark生成的DAG造成的。一些运算符(或 transformers
)是惰性执行的,因此它们为 Spark 优化 DAG 铺平了道路。
在这个例子中,有两个主要步骤:首先是select
(或者SQL行话中的project
),然后是filter
。但实际上,执行的时候,先filter
,再select
,因为效率更高
你可以通过explain()
方法验证这个结论:
test_df.filter(f.col("flag") == '1').explain()
它将输出:
== Physical Plan ==
*(1) Project [dept_name#0, dept_id#1L]
+- *(1) Filter (isnotnull(flag#2L) AND (flag#2L = 1))
+- *(1) Scan ExistingRDD[dept_name#0,dept_id#1L,flag#2L]
添加到@chenzhongpu 的回答中,请注意,如果您在 test_df
之上定义临时视图,查询将失败:
test_df.createOrReplaceTempView("test_df")
spark.sql("select * from test_df where flag = 1").show()
_Traceback (most recent call last): ...
:
pyspark.sql.utils.AnalysisException: u"cannot resolve '`flag`' given input columns: [test_df.dept, test_df.id]; line 1 pos 24;
'Project [*]
+- 'Filter ('flag = 1)
+- SubqueryAlias `test_df`
+- Project [dept#0, id#2L]
+- LogicalRDD [dept#0, flag#1L, id#2L], false
_
...因为 select
(执行计划中的=Project
节点)将先于过滤器(尝试通过 where
子句)。