pyspark sql sum 与 aggr
pyspark sql sum vs aggr
在 Pyspark 中,以下哪种方法更好?
第二个查询是否比 PySpark 中的第一个查询(在集群模式下)有任何 advantage/performance 增益?
#1) without using aggr
total_distance_df = spark.sql("SELECT sum(distance) FROM flights")\
.withColumnRenamed('sum(CAST(distance AS DOUBLE))', 'total_distance')
total_distance_df.show()
Vs
#2) with using aggr
total_distance_df = spark.sql("SELECT distance FROM flights")\
.agg({"distance":"sum"})\
.withColumnRenamed("sum(distance)","total_distance")
total_distance_df.show()
两者相同,检查explain
查询计划以查看任何差异。
Example:
#sample df
df1.show()
+---+--------+
| id|distance|
+---+--------+
| a| 1|
| b| 2|
+---+--------+
df1.createOrReplaceTempView("tmp")
spark.sql("SELECT sum(distance) FROM tmp").withColumnRenamed('sum(CAST(distance AS DOUBLE))', 'total_distance').explain()
#== Physical Plan ==
#*(2) HashAggregate(keys=[], functions=[sum(distance#179L)])
#+- Exchange SinglePartition
# +- *(1) HashAggregate(keys=[], functions=[partial_sum(distance#179L)])
# +- *(1) Project [distance#179L]
# +- Scan ExistingRDD[id#178,distance#179L]
spark.sql("SELECT distance FROM tmp").agg({"distance":"sum"}).explain()
#== Physical Plan ==
#*(2) HashAggregate(keys=[], functions=[sum(distance#179L)])
#+- Exchange SinglePartition
# +- *(1) HashAggregate(keys=[], functions=[partial_sum(distance#179L)])
# +- *(1) Project [distance#179L]
# +- Scan ExistingRDD[id#178,distance#179L]
如您所见,SUM 和 aggr 的计划相似。
在 Pyspark 中,以下哪种方法更好?
第二个查询是否比 PySpark 中的第一个查询(在集群模式下)有任何 advantage/performance 增益?
#1) without using aggr
total_distance_df = spark.sql("SELECT sum(distance) FROM flights")\
.withColumnRenamed('sum(CAST(distance AS DOUBLE))', 'total_distance')
total_distance_df.show()
Vs
#2) with using aggr
total_distance_df = spark.sql("SELECT distance FROM flights")\
.agg({"distance":"sum"})\
.withColumnRenamed("sum(distance)","total_distance")
total_distance_df.show()
两者相同,检查explain
查询计划以查看任何差异。
Example:
#sample df
df1.show()
+---+--------+
| id|distance|
+---+--------+
| a| 1|
| b| 2|
+---+--------+
df1.createOrReplaceTempView("tmp")
spark.sql("SELECT sum(distance) FROM tmp").withColumnRenamed('sum(CAST(distance AS DOUBLE))', 'total_distance').explain()
#== Physical Plan ==
#*(2) HashAggregate(keys=[], functions=[sum(distance#179L)])
#+- Exchange SinglePartition
# +- *(1) HashAggregate(keys=[], functions=[partial_sum(distance#179L)])
# +- *(1) Project [distance#179L]
# +- Scan ExistingRDD[id#178,distance#179L]
spark.sql("SELECT distance FROM tmp").agg({"distance":"sum"}).explain()
#== Physical Plan ==
#*(2) HashAggregate(keys=[], functions=[sum(distance#179L)])
#+- Exchange SinglePartition
# +- *(1) HashAggregate(keys=[], functions=[partial_sum(distance#179L)])
# +- *(1) Project [distance#179L]
# +- Scan ExistingRDD[id#178,distance#179L]
如您所见,SUM 和 aggr 的计划相似。