spark如何计算数据框中的记录数?
How does spark calculates number of records in a dataframe?
我知道 df.count() 会触发一个 spark 动作并且 return 数据框中存在的记录数,但我想知道这个过程在内部是如何工作的,spark 会贯穿整个过程dataframe 来计算记录的数量,或者是否有任何其他优化技术,例如在 dataframe 的元数据中存储值?
我正在使用 pyspark 3.2.1。
看起来 运行 df.count()
实际上使用了 Count aggregation class. I am basing this on the definition of the count
method in Dataset.scala.
/**
* Returns the number of rows in the Dataset.
* @group action
* @since 1.6.0
*/
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0)
}
is there any other optimised technique like storing value in
dataframe's metadata?
它将采用 Catalyst 使用的所有相同优化策略。它创建一个有向表达式图,计算并汇总它们。它没有将计数值存储为元数据,这将违反 Spark 的惰性评估原则。
我 运行 进行了实验并验证了 df.count()
和 df.groupBy().count()
产生了相同的物理计划。
df = spark.createDataFrame(pd.DataFrame({"a": [1,2,3], "b": ["a", "b", "c"]}))
# count using the Dataframe method
df.count()
# count using the Count aggregator
cnt_agg = df.groupBy().count()
这两个工作产生了相同的物理计划:
== Physical Plan ==
AdaptiveSparkPlan (9)
+- == Final Plan ==
* HashAggregate (6)
+- ShuffleQueryStage (5), Statistics(sizeInBytes=64.0 B, rowCount=4, isRuntime=true)
+- Exchange (4)
+- * HashAggregate (3)
+- * Project (2)
+- * Scan ExistingRDD (1)
+- == Initial Plan ==
HashAggregate (8)
+- Exchange (7)
+- HashAggregate (3)
+- Project (2)
+- Scan ExistingRDD (1)
我知道 df.count() 会触发一个 spark 动作并且 return 数据框中存在的记录数,但我想知道这个过程在内部是如何工作的,spark 会贯穿整个过程dataframe 来计算记录的数量,或者是否有任何其他优化技术,例如在 dataframe 的元数据中存储值?
我正在使用 pyspark 3.2.1。
看起来 运行 df.count()
实际上使用了 Count aggregation class. I am basing this on the definition of the count
method in Dataset.scala.
/**
* Returns the number of rows in the Dataset.
* @group action
* @since 1.6.0
*/
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0)
}
is there any other optimised technique like storing value in dataframe's metadata?
它将采用 Catalyst 使用的所有相同优化策略。它创建一个有向表达式图,计算并汇总它们。它没有将计数值存储为元数据,这将违反 Spark 的惰性评估原则。
我 运行 进行了实验并验证了 df.count()
和 df.groupBy().count()
产生了相同的物理计划。
df = spark.createDataFrame(pd.DataFrame({"a": [1,2,3], "b": ["a", "b", "c"]}))
# count using the Dataframe method
df.count()
# count using the Count aggregator
cnt_agg = df.groupBy().count()
这两个工作产生了相同的物理计划:
== Physical Plan ==
AdaptiveSparkPlan (9)
+- == Final Plan ==
* HashAggregate (6)
+- ShuffleQueryStage (5), Statistics(sizeInBytes=64.0 B, rowCount=4, isRuntime=true)
+- Exchange (4)
+- * HashAggregate (3)
+- * Project (2)
+- * Scan ExistingRDD (1)
+- == Initial Plan ==
HashAggregate (8)
+- Exchange (7)
+- HashAggregate (3)
+- Project (2)
+- Scan ExistingRDD (1)