如何在 Spark 中获取 hive table 的分区信息
how to get the partitions info of hive table in Spark
我想像这样通过 Spark 执行 SQL。
sparkSession.sql("select * from table")
但我想在执行前对 table 进行分区检查,避免全扫描。
如果 table 是分区的 table,我的程序将强制用户添加分区过滤器。如果不是,可以 运行.
所以我的问题是如何知道 table 是否是分区的 table?
我的想法是从 Metastore 读取信息。但是如何获取 Metastore 是我遇到的另一个问题。有人可以帮忙吗?
您可以使用 Scala 的 Try
class 并在所需的 table 上执行 show partitions
。
val numPartitions = Try(spark.sql("show partitions database.table").count) match {
case Success(v) => v
case Failure(e) => -1
}
稍后您可以检查numPartitions
。如果值为 -1
,则 table 未分区。
假设你的真正目标是限制无界查询的执行,我认为获取查询的执行计划并查看其 FileScan
/ HiveTableScan
叶节点下是否有更容易正在应用分区过滤器。顺便说一下,对于分区 tables,查询实际要扫描的分区数也会被显示出来。所以,应该这样做:
scala> val df_unbound = spark.sql("select * from hottab")
df_unbound: org.apache.spark.sql.DataFrame = [id: int, descr: string ... 1 more field]
scala> val plan1 = df_unbound.queryExecution.executedPlan.toString
plan1: String =
"*(1) FileScan parquet default.hottab[id#0,descr#1,loaddate#2] Batched: true, Format: Parquet,
Location: CatalogFileIndex[hdfs://ns1/user/hive/warehouse/hottab],
PartitionCount: 365, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int,descr:string>
"
scala> val df_filtered = spark.sql("select * from hottab where loaddate='2019-07-31'")
df_filtered: org.apache.spark.sql.DataFrame = [id: int, descr: string ... 1 more field]
scala> val plan2 = df_filtered.queryExecution.executedPlan.toString
plan2: String =
"*(1) FileScan parquet default.hottab[id#17,descr#18,loaddate#19] Batched: true, Format: Parquet,
Location: PrunedInMemoryFileIndex[hdfs://ns1/user/hive/warehouse/hottab/loaddate=2019-07-31],
PartitionCount: 1, PartitionFilters: [isnotnull(loaddate#19), (loaddate#19 = 2019-07-31)],
PushedFilters: [], ReadSchema: struct<id:int,descr:string>
"
这样,您也不必处理 SQL 解析以从查询中查找 table 名称,也不必自己查询 Metastore。
作为奖励,除了分区修剪之外,您还可以查看 "regular" 过滤器下推是否发生(对于支持它的存储格式)。
val listPartitions = spark.sessionState.catalog.listPartitionNames(TableIdentifier("table_name", Some("db name")))
listPartitions: Seq[String] = ArrayBuffer(partition1=value1, ... ) // partition table
listPartitions: Seq[String] = ArrayBuffer() // not partition table
我想像这样通过 Spark 执行 SQL。
sparkSession.sql("select * from table")
但我想在执行前对 table 进行分区检查,避免全扫描。
如果 table 是分区的 table,我的程序将强制用户添加分区过滤器。如果不是,可以 运行.
所以我的问题是如何知道 table 是否是分区的 table? 我的想法是从 Metastore 读取信息。但是如何获取 Metastore 是我遇到的另一个问题。有人可以帮忙吗?
您可以使用 Scala 的 Try
class 并在所需的 table 上执行 show partitions
。
val numPartitions = Try(spark.sql("show partitions database.table").count) match {
case Success(v) => v
case Failure(e) => -1
}
稍后您可以检查numPartitions
。如果值为 -1
,则 table 未分区。
假设你的真正目标是限制无界查询的执行,我认为获取查询的执行计划并查看其 FileScan
/ HiveTableScan
叶节点下是否有更容易正在应用分区过滤器。顺便说一下,对于分区 tables,查询实际要扫描的分区数也会被显示出来。所以,应该这样做:
scala> val df_unbound = spark.sql("select * from hottab")
df_unbound: org.apache.spark.sql.DataFrame = [id: int, descr: string ... 1 more field]
scala> val plan1 = df_unbound.queryExecution.executedPlan.toString
plan1: String =
"*(1) FileScan parquet default.hottab[id#0,descr#1,loaddate#2] Batched: true, Format: Parquet,
Location: CatalogFileIndex[hdfs://ns1/user/hive/warehouse/hottab],
PartitionCount: 365, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int,descr:string>
"
scala> val df_filtered = spark.sql("select * from hottab where loaddate='2019-07-31'")
df_filtered: org.apache.spark.sql.DataFrame = [id: int, descr: string ... 1 more field]
scala> val plan2 = df_filtered.queryExecution.executedPlan.toString
plan2: String =
"*(1) FileScan parquet default.hottab[id#17,descr#18,loaddate#19] Batched: true, Format: Parquet,
Location: PrunedInMemoryFileIndex[hdfs://ns1/user/hive/warehouse/hottab/loaddate=2019-07-31],
PartitionCount: 1, PartitionFilters: [isnotnull(loaddate#19), (loaddate#19 = 2019-07-31)],
PushedFilters: [], ReadSchema: struct<id:int,descr:string>
"
这样,您也不必处理 SQL 解析以从查询中查找 table 名称,也不必自己查询 Metastore。
作为奖励,除了分区修剪之外,您还可以查看 "regular" 过滤器下推是否发生(对于支持它的存储格式)。
val listPartitions = spark.sessionState.catalog.listPartitionNames(TableIdentifier("table_name", Some("db name")))
listPartitions: Seq[String] = ArrayBuffer(partition1=value1, ... ) // partition table
listPartitions: Seq[String] = ArrayBuffer() // not partition table