Spark 不利用 parquet 的 hdfs 分区
Spark not leveraging hdfs partitioning with parquet
我正在使用以下命令将 parquet 文件写入 hdfs:
df.write.mode(SaveMode.Append).partitionBy(id).parquet(path)
之后我正在阅读和过滤文件,如下所示:
val file = sqlContext.read.parquet(folder)
val data = file.map(r => Row(r.getInt(4).toString, r.getString(0), r.getInt(1),
r.getLong(2), r.getString(3)))
val filteredData = data.filter(x => x.thingId.equals("1"))
filteredData.collect()
我预计 Spark 会利用文件的分区并且只读取 "thingId = 1" 的分区。
事实上,Spark 会读取文件的所有分区,而不仅仅是过滤后的分区(thingId=1 的分区)。
如果我查看日志,我可以看到它确实读取了所有内容:
16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from
hdfs://sandbox.hortonworks.com/path/id=1/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet
16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from
hdfs://sandbox.hortonworks.com/path/id=42/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet
16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from
hdfs://sandbox.hortonworks.com/path/id=17/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet
16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from
hdfs://sandbox.hortonworks.com/path/0833/id=33/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet
16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from
hdfs://sandbox.hortonworks.com/path/id=26/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet
16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from
hdfs://sandbox.hortonworks.com/path/id=12/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet
有什么我遗漏的吗?当我查看文档时,Spark 应该知道基于过滤器,它应该只读取 thingID=1 的分区。
你们有人知道问题出在哪里吗?
一些问题可能会阻止 Spark 成功“下推”谓词(即在输入格式级别使用过滤器):
过滤器下推关闭:根据您使用的 Spark 版本,谓词下推选项 (spark.sql.parquet.filterPushdown
) 可能会关闭。从 Spark 1.5.0 开始它默认开启 - 所以检查你的版本和你的配置
过滤器是“不透明的”:这里似乎是这种情况:您正在加载镶木地板文件,将每一行映射到另一行(重新排序列?),然后使用接受 函数 的 filter
方法。 Spark 无法“读取”函数代码并意识到它在分区列上使用比较 - 对于 Spark,这只是一个可以进行各种检查的 Row => Boolean
函数...
要使过滤器下推起作用,您需要在将记录映射到与原始结构“分离”的内容之前使用它,并使用 filter
使用过滤器的重载之一可由 Spark 解析,例如:
// assuming the relevant column name is "id" in the parquet structure
val filtered = file.filter("id = 1")
// or:
val filtered = file.filter(col("id") === 1)
// and only then:
val data = filtered.map(r => Row(...))
我正在使用以下命令将 parquet 文件写入 hdfs:
df.write.mode(SaveMode.Append).partitionBy(id).parquet(path)
之后我正在阅读和过滤文件,如下所示:
val file = sqlContext.read.parquet(folder)
val data = file.map(r => Row(r.getInt(4).toString, r.getString(0), r.getInt(1),
r.getLong(2), r.getString(3)))
val filteredData = data.filter(x => x.thingId.equals("1"))
filteredData.collect()
我预计 Spark 会利用文件的分区并且只读取 "thingId = 1" 的分区。 事实上,Spark 会读取文件的所有分区,而不仅仅是过滤后的分区(thingId=1 的分区)。 如果我查看日志,我可以看到它确实读取了所有内容:
16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=1/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=42/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=17/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/0833/id=33/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=26/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=12/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet
有什么我遗漏的吗?当我查看文档时,Spark 应该知道基于过滤器,它应该只读取 thingID=1 的分区。 你们有人知道问题出在哪里吗?
一些问题可能会阻止 Spark 成功“下推”谓词(即在输入格式级别使用过滤器):
过滤器下推关闭:根据您使用的 Spark 版本,谓词下推选项 (
spark.sql.parquet.filterPushdown
) 可能会关闭。从 Spark 1.5.0 开始它默认开启 - 所以检查你的版本和你的配置过滤器是“不透明的”:这里似乎是这种情况:您正在加载镶木地板文件,将每一行映射到另一行(重新排序列?),然后使用接受 函数 的
filter
方法。 Spark 无法“读取”函数代码并意识到它在分区列上使用比较 - 对于 Spark,这只是一个可以进行各种检查的Row => Boolean
函数...要使过滤器下推起作用,您需要在将记录映射到与原始结构“分离”的内容之前使用它,并使用
filter
使用过滤器的重载之一可由 Spark 解析,例如:// assuming the relevant column name is "id" in the parquet structure val filtered = file.filter("id = 1") // or: val filtered = file.filter(col("id") === 1) // and only then: val data = filtered.map(r => Row(...))