Spark 不推送过滤器(PushedFilters 数组为空)

Spark does not push filter (PushedFilters array is empty)

简介

我注意到我们项目中的 none 个推送过滤器起作用。它解释了为什么执行时间会受到影响,因为它读取了数百万次读取,而本应将其减少到几千次。为了调试这个问题,我编写了一个小测试来读取 CSV 文件、过滤内容(下推过滤器)和 return 结果。

它不适用于 CSV,因此我尝试读取镶木地板文件。 None 其中有效。

数据

people.csv 文件具有以下结构:

first_name,last_name,city  // header
FirstName1,LastName1,Bern // 1st row
FirstName2,LastName2,Sion // 2nd row
FirstName3,LastName3,Bulle // 3rd row

N.B: parquet 文件结构相同

读取 CSV 文件

为了重现该问题,我编写了一个读取 csv 文件的最小代码,应该 return 仅过滤数据。

读取csv文件并打印实物图:

Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
ds.where(col("city").equalTo("Bern")).show();
ds.explain(true);

物理计划:

+----------+---------+----+
|first_name|last_name|city|
+----------+---------+----+
|FirstName1|LastName1|Bern|
+----------+---------+----+

== Parsed Logical Plan == Relation[first_name#10,last_name#11,city#12] csv

== Analyzed Logical Plan == first_name: string, last_name: string, city: string Relation[first_name#10,last_name#11,city#12] csv

== Optimized Logical Plan == Relation[first_name#10,last_name#11,city#12] csv

== Physical Plan == *(1) FileScan csv [first_name#10,last_name#11,city#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:people.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct

我已经用 parquet 文件进行了测试,不幸的是结果是一样的。

我们可以注意到的是:

我的问题是:为什么这个 PushedFilters 是空的?

N.B:

您正在对第一个数据集调用解释,该数据集只有读取。尝试类似的东西(抱歉,我只有可用的 Scala 环境):

val ds: DataFrame = spark.read.option("header", "true").csv("input.csv")
val f = ds.filter(col("city").equalTo("Bern"))

f.explain(true)

f.show()

此外,由于,使用类型化数据集API时要小心。不过不应该是你的情况。

只是为了文档,这里是解决方案(感谢 LizardKing):

结果

  • 之前:PushedFilters: []
  • 之后:PushedFilters: [IsNotNull(city), EqualTo(city,Bern)]

代码

Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
Dataset<Row> dsFiltered = ds.where(col("city").equalTo("Bern"));
dsFiltered.explain(true);

物理计划

物理计划看起来好多了:

== Parsed Logical Plan ==
'Filter ('city = Bern)
+- Relation[first_name#10,last_name#11,city#12] csv

== Analyzed Logical Plan ==
first_name: string, last_name: string, city: string
Filter (city#12 = Bern)
+- Relation[first_name#10,last_name#11,city#12] csv

== Optimized Logical Plan ==
Filter (isnotnull(city#12) && (city#12 = Bern))
+- Relation[first_name#10,last_name#11,city#12] csv

== Physical Plan ==
*(1) Project [first_name#10, last_name#11, city#12]
+- *(1) Filter (isnotnull(city#12) && (city#12 = Bern))
   +- *(1) FileScan csv [first_name#10,last_name#11,city#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:./people.csv], PartitionFilters: [], PushedFilters: [IsNotNull(city), EqualTo(city,Bern)], ReadSchema: struct<first_name:string,last_name:string,city:string>