为什么 dataset.count() 比 rdd.count() 快?

Why dataset.count() is faster than rdd.count()?

我创建了一个 Spark Dataset[Long]:

scala> val ds = spark.range(100000000)
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]

当我 运行 ds.count 它给了我 0.2s 的结果(在 4 核 8GB 机器上)。另外,它创建的 DAG 如下:

但是,当我 运行 ds.rdd.count 它给了我 4s 的结果(同一台机器)。但是它创建的DAG是这样的:

所以,我的疑惑是:

  1. 为什么 ds.rdd.count 只创建了一个阶段,而 ds.count 却创建了 2 个阶段?
  2. 此外,当 ds.rdd.count 只有一个阶段时,为什么它比具有 2 个阶段的 ds.count 慢?

Why ds.rdd.count is creating only one stage whereas ds.count is creating 2 stages ?

两个计数实际上都是两步操作。不同的是,在ds.count的情况下,最终聚合是由其中一个执行者执行的,而ds.rdd.countaggregates the final result on the driver,因此这一步不会反映在DAG中:

Also, when ds.rdd.count is having only one stage then why it is slower

同上。此外 ds.rdd.count 必须初始化(以及后来的垃圾收集)1 亿个 Row 对象,这几乎是免费的并且可能占这里时间差异的大部分。

最后 range-like 对象不是一个好的基准测试工具,除非非常小心地使用。根据上下文,超出范围的计数可以表示为恒定时间操作,即使没有显式优化也可以非常快(参见示例 spark.sparkContext.range(0, 100000000).count),但不能反映实际工作负载的性能。

相关: