pyspark sql sum 与 aggr

pyspark sql sum vs aggr

在 Pyspark 中,以下哪种方法更好?
第二个查询是否比 PySpark 中的第一个查询(在集群模式下)有任何 advantage/performance 增益?

    #1) without using aggr
    total_distance_df = spark.sql("SELECT sum(distance) FROM flights")\
        .withColumnRenamed('sum(CAST(distance AS DOUBLE))', 'total_distance')
    total_distance_df.show()

    Vs

    #2) with using aggr
    total_distance_df = spark.sql("SELECT distance FROM flights")\
                             .agg({"distance":"sum"})\
                             .withColumnRenamed("sum(distance)","total_distance")
    total_distance_df.show()

两者相同,检查explain查询计划以查看任何差异。

Example:

#sample df
 df1.show()
+---+--------+
| id|distance|
+---+--------+
|  a|       1|
|  b|       2|
+---+--------+

df1.createOrReplaceTempView("tmp")

spark.sql("SELECT sum(distance) FROM tmp").withColumnRenamed('sum(CAST(distance AS DOUBLE))', 'total_distance').explain()
#== Physical Plan ==
#*(2) HashAggregate(keys=[], functions=[sum(distance#179L)])
#+- Exchange SinglePartition
#   +- *(1) HashAggregate(keys=[], functions=[partial_sum(distance#179L)])
#      +- *(1) Project [distance#179L]
#         +- Scan ExistingRDD[id#178,distance#179L]

spark.sql("SELECT distance FROM tmp").agg({"distance":"sum"}).explain()
#== Physical Plan ==
#*(2) HashAggregate(keys=[], functions=[sum(distance#179L)])
#+- Exchange SinglePartition
#   +- *(1) HashAggregate(keys=[], functions=[partial_sum(distance#179L)])
#      +- *(1) Project [distance#179L]
#         +- Scan ExistingRDD[id#178,distance#179L]

如您所见,SUM 和 aggr 的计划相似。