Spark 不推送过滤器(PushedFilters 数组为空)
Spark does not push filter (PushedFilters array is empty)
简介
我注意到我们项目中的 none 个推送过滤器起作用。它解释了为什么执行时间会受到影响,因为它读取了数百万次读取,而本应将其减少到几千次。为了调试这个问题,我编写了一个小测试来读取 CSV 文件、过滤内容(下推过滤器)和 return 结果。
它不适用于 CSV,因此我尝试读取镶木地板文件。 None 其中有效。
数据
people.csv
文件具有以下结构:
first_name,last_name,city // header
FirstName1,LastName1,Bern // 1st row
FirstName2,LastName2,Sion // 2nd row
FirstName3,LastName3,Bulle // 3rd row
N.B: parquet 文件结构相同
读取 CSV 文件
为了重现该问题,我编写了一个读取 csv 文件的最小代码,应该 return 仅过滤数据。
读取csv文件并打印实物图:
Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
ds.where(col("city").equalTo("Bern")).show();
ds.explain(true);
物理计划:
+----------+---------+----+
|first_name|last_name|city|
+----------+---------+----+
|FirstName1|LastName1|Bern|
+----------+---------+----+
== Parsed Logical Plan == Relation[first_name#10,last_name#11,city#12] csv
== Analyzed Logical Plan == first_name: string, last_name: string, city: string Relation[first_name#10,last_name#11,city#12] csv
== Optimized Logical Plan == Relation[first_name#10,last_name#11,city#12] csv
== Physical Plan ==
*(1) FileScan csv [first_name#10,last_name#11,city#12] Batched: false, Format: CSV, Location:
InMemoryFileIndex[file:people.csv],
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct
我已经用 parquet 文件进行了测试,不幸的是结果是一样的。
我们可以注意到的是:
- PushedFilters 是空的,我希望过滤器包含谓词。
- returned 结果仍然正确。
我的问题是:为什么这个 PushedFilters 是空的?
N.B:
- Spark-版本: 2.4.3
- 文件系统:ext4(和集群上的 HDFS,都没有工作)
您正在对第一个数据集调用解释,该数据集只有读取。尝试类似的东西(抱歉,我只有可用的 Scala 环境):
val ds: DataFrame = spark.read.option("header", "true").csv("input.csv")
val f = ds.filter(col("city").equalTo("Bern"))
f.explain(true)
f.show()
此外,由于,使用类型化数据集API时要小心。不过不应该是你的情况。
只是为了文档,这里是解决方案(感谢 LizardKing):
结果
- 之前:
PushedFilters: []
- 之后:
PushedFilters: [IsNotNull(city), EqualTo(city,Bern)]
代码
Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
Dataset<Row> dsFiltered = ds.where(col("city").equalTo("Bern"));
dsFiltered.explain(true);
物理计划
物理计划看起来好多了:
== Parsed Logical Plan ==
'Filter ('city = Bern)
+- Relation[first_name#10,last_name#11,city#12] csv
== Analyzed Logical Plan ==
first_name: string, last_name: string, city: string
Filter (city#12 = Bern)
+- Relation[first_name#10,last_name#11,city#12] csv
== Optimized Logical Plan ==
Filter (isnotnull(city#12) && (city#12 = Bern))
+- Relation[first_name#10,last_name#11,city#12] csv
== Physical Plan ==
*(1) Project [first_name#10, last_name#11, city#12]
+- *(1) Filter (isnotnull(city#12) && (city#12 = Bern))
+- *(1) FileScan csv [first_name#10,last_name#11,city#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:./people.csv], PartitionFilters: [], PushedFilters: [IsNotNull(city), EqualTo(city,Bern)], ReadSchema: struct<first_name:string,last_name:string,city:string>
简介
我注意到我们项目中的 none 个推送过滤器起作用。它解释了为什么执行时间会受到影响,因为它读取了数百万次读取,而本应将其减少到几千次。为了调试这个问题,我编写了一个小测试来读取 CSV 文件、过滤内容(下推过滤器)和 return 结果。
它不适用于 CSV,因此我尝试读取镶木地板文件。 None 其中有效。
数据
people.csv
文件具有以下结构:
first_name,last_name,city // header
FirstName1,LastName1,Bern // 1st row
FirstName2,LastName2,Sion // 2nd row
FirstName3,LastName3,Bulle // 3rd row
N.B: parquet 文件结构相同
读取 CSV 文件
为了重现该问题,我编写了一个读取 csv 文件的最小代码,应该 return 仅过滤数据。
读取csv文件并打印实物图:
Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
ds.where(col("city").equalTo("Bern")).show();
ds.explain(true);
物理计划:
+----------+---------+----+
|first_name|last_name|city|
+----------+---------+----+
|FirstName1|LastName1|Bern|
+----------+---------+----+== Parsed Logical Plan == Relation[first_name#10,last_name#11,city#12] csv
== Analyzed Logical Plan == first_name: string, last_name: string, city: string Relation[first_name#10,last_name#11,city#12] csv
== Optimized Logical Plan == Relation[first_name#10,last_name#11,city#12] csv
== Physical Plan == *(1) FileScan csv [first_name#10,last_name#11,city#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:people.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct
我已经用 parquet 文件进行了测试,不幸的是结果是一样的。
我们可以注意到的是:
- PushedFilters 是空的,我希望过滤器包含谓词。
- returned 结果仍然正确。
我的问题是:为什么这个 PushedFilters 是空的?
N.B:
- Spark-版本: 2.4.3
- 文件系统:ext4(和集群上的 HDFS,都没有工作)
您正在对第一个数据集调用解释,该数据集只有读取。尝试类似的东西(抱歉,我只有可用的 Scala 环境):
val ds: DataFrame = spark.read.option("header", "true").csv("input.csv")
val f = ds.filter(col("city").equalTo("Bern"))
f.explain(true)
f.show()
此外,由于
只是为了文档,这里是解决方案(感谢 LizardKing):
结果
- 之前:
PushedFilters: []
- 之后:
PushedFilters: [IsNotNull(city), EqualTo(city,Bern)]
代码
Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
Dataset<Row> dsFiltered = ds.where(col("city").equalTo("Bern"));
dsFiltered.explain(true);
物理计划
物理计划看起来好多了:
== Parsed Logical Plan ==
'Filter ('city = Bern)
+- Relation[first_name#10,last_name#11,city#12] csv
== Analyzed Logical Plan ==
first_name: string, last_name: string, city: string
Filter (city#12 = Bern)
+- Relation[first_name#10,last_name#11,city#12] csv
== Optimized Logical Plan ==
Filter (isnotnull(city#12) && (city#12 = Bern))
+- Relation[first_name#10,last_name#11,city#12] csv
== Physical Plan ==
*(1) Project [first_name#10, last_name#11, city#12]
+- *(1) Filter (isnotnull(city#12) && (city#12 = Bern))
+- *(1) FileScan csv [first_name#10,last_name#11,city#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:./people.csv], PartitionFilters: [], PushedFilters: [IsNotNull(city), EqualTo(city,Bern)], ReadSchema: struct<first_name:string,last_name:string,city:string>