为什么使用 sql 表达式进行过滤比在 Spark 中的 DataSet 中应用的函数更好

why filtering using sql expression is better compared to a function applied in DataSet in Spark

我正在阅读 spark ultimat guid 书,上面写着:

by specifying a function we are forcing spark to evaluate this function on every row in our dataSet...For simple filters it is always preferred to write sql expression

我不明白为什么 sql 表达式会更好,因为表达式也会应用于数据集的每一行!!
谁能给我更多细节?

通过使用列表达式,Spark 的优化器有机会优化查询,因为它可以查看 "into" 过滤器并可能将其移动到更好的位置以缩短执行时间。

示例:

Image 你有一个由两列 iddata 组成的数据集,你的逻辑将首先按 id 列对数据集进行分组,然后总结 data值。在这个分组操作之后,只应保留 id = 2 的组。在这种情况下,先过滤再求和会更快。通过将过滤器实现为列表达式,Spark 可以检测到此优化并首先应用过滤器:

val dfParquet = spark.read.parquet(<path to data>)
val groupedDf = dfParquet.groupBy("id").sum("data")
val groupedDfWithColumnFilter = groupedDf.filter("id = 2")
val groupedDfWithFilterFunction = groupedDf.filter(_.get(0).equals(2))

如果我们检查 groupedDfWithColumnFilter 的执行计划,我们会得到

== Physical Plan ==
HashAggregate(keys=[id#0L], functions=[sum(data#1L)])
+- Exchange hashpartitioning(id#0L, 200)
   +- HashAggregate(keys=[id#0L], functions=[partial_sum(data#1L)])
      +- Project [id#0L, data#1L]
         +- Filter (isnotnull(id#0L) && (id#0L = 2))
            +- FileScan parquet [id#0L,data#1L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:.../id], PartitionFilters: [], 
                 PushedFilters: [IsNotNull(id), EqualTo(id,2)], ReadSchema: struct

所以首先应用过滤器,甚至向下推送到 parquet 文件 reader。

然而 groupedDfWithFilterFunction 的执行计划显示 Spark 无法进行此优化并将过滤器应用为最后一步,因此失去了优化:

== Physical Plan ==
Filter <function1>.apply
+- HashAggregate(keys=[id#0L], functions=[sum(data#1L)])
   +- Exchange hashpartitioning(id#0L, 200)
      +- HashAggregate(keys=[id#0L], functions=[partial_sum(data#1L)])
         +- FileScan parquet [id#0L,data#1L] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:.../id], PartitionFilters: [], 
              PushedFilters: [], ReadSchema: struct


另一种查看差异的方法是查看 Spark UI。对于我的测试用例,我创建了一个镶木地板文件,其中包含 100 个分区中的 10 个 mio 行。 在 SQL 选项卡中,可以看到由于 groupedDfWithColumnFilter Spark 的下推过滤器仅从磁盘加载大约 200K 行数据,而对于 groupedDfWithFilterFunction Spark 需要加载所有 10 mio 行: