spark如何计算数据框中的记录数?

How does spark calculates number of records in a dataframe?

我知道 df.count() 会触发一个 spark 动作并且 return 数据框中存在的记录数,但我想知道这个过程在内部是如何工作的,spark 会贯穿整个过程dataframe 来计算记录的数量,或者是否有任何其他优化技术,例如在 dataframe 的元数据中存储值?

我正在使用 pyspark 3.2.1。

看起来 运行 df.count() 实际上使用了 Count aggregation class. I am basing this on the definition of the count method in Dataset.scala.

  /**
   * Returns the number of rows in the Dataset.
   * @group action
   * @since 1.6.0
   */
  def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
    plan.executeCollect().head.getLong(0)
  }

is there any other optimised technique like storing value in dataframe's metadata?

它将采用 Catalyst 使用的所有相同优化策略。它创建一个有向表达式图,计算并汇总它们。它没有将计数值存储为元数据,这将违反 Spark 的惰性评估原则。

我 运行 进行了实验并验证了 df.count()df.groupBy().count() 产生了相同的物理计划。

df = spark.createDataFrame(pd.DataFrame({"a": [1,2,3], "b": ["a", "b", "c"]}))

# count using the Dataframe method
df.count()

# count using the Count aggregator
cnt_agg = df.groupBy().count()

这两个工作产生了相同的物理计划:

== Physical Plan ==
AdaptiveSparkPlan (9)
+- == Final Plan ==
   * HashAggregate (6)
   +- ShuffleQueryStage (5), Statistics(sizeInBytes=64.0 B, rowCount=4, isRuntime=true)
      +- Exchange (4)
         +- * HashAggregate (3)
            +- * Project (2)
               +- * Scan ExistingRDD (1)
+- == Initial Plan ==
   HashAggregate (8)
   +- Exchange (7)
      +- HashAggregate (3)
         +- Project (2)
            +- Scan ExistingRDD (1)