如何在 Spark 中获取 hive table 的分区信息

how to get the partitions info of hive table in Spark

我想像这样通过 Spark 执行 SQL。

sparkSession.sql("select * from table")

但我想在执行前对 table 进行分区检查,避免全扫描。

如果 table 是分区的 table,我的程序将强制用户添加分区过滤器。如果不是,可以 运行.

所以我的问题是如何知道 table 是否是分区的 table? 我的想法是从 Metastore 读取信息。但是如何获取 Metastore 是我遇到的另一个问题。有人可以帮忙吗?

您可以使用 Scala 的 Try class 并在所需的 table 上执行 show partitions

val numPartitions = Try(spark.sql("show partitions database.table").count) match {
    case Success(v) => v
    case Failure(e) => -1
}

稍后您可以检查numPartitions。如果值为 -1,则 table 未分区。

假设你的真正目标是限制无界查询的执行,我认为获取查询的执行计划并查看其 FileScan / HiveTableScan 叶节点下是否有更容易正在应用分区过滤器。顺便说一下,对于分区 tables,查询实际要扫描的分区数也会被显示出来。所以,应该这样做:

scala> val df_unbound = spark.sql("select * from hottab")
df_unbound: org.apache.spark.sql.DataFrame = [id: int, descr: string ... 1 more field]

scala> val plan1 = df_unbound.queryExecution.executedPlan.toString
plan1: String =
"*(1) FileScan parquet default.hottab[id#0,descr#1,loaddate#2] Batched: true, Format: Parquet, 
Location: CatalogFileIndex[hdfs://ns1/user/hive/warehouse/hottab], 
PartitionCount: 365, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int,descr:string>
"

scala> val df_filtered = spark.sql("select * from hottab where loaddate='2019-07-31'")
df_filtered: org.apache.spark.sql.DataFrame = [id: int, descr: string ... 1 more field]

scala> val plan2 = df_filtered.queryExecution.executedPlan.toString
plan2: String =
"*(1) FileScan parquet default.hottab[id#17,descr#18,loaddate#19] Batched: true, Format: Parquet, 
Location: PrunedInMemoryFileIndex[hdfs://ns1/user/hive/warehouse/hottab/loaddate=2019-07-31], 
PartitionCount: 1, PartitionFilters: [isnotnull(loaddate#19), (loaddate#19 = 2019-07-31)], 
PushedFilters: [], ReadSchema: struct<id:int,descr:string>
"

这样,您也不必处理 SQL 解析以从查询中查找 table 名称,也不必自己查询 Metastore。

作为奖励,除了分区修剪之外,您还可以查看 "regular" 过滤器下推是否发生(对于支持它的存储格式)。

  val listPartitions = spark.sessionState.catalog.listPartitionNames(TableIdentifier("table_name", Some("db name")))
  listPartitions: Seq[String] = ArrayBuffer(partition1=value1, ... )  // partition table
  listPartitions: Seq[String] = ArrayBuffer() // not partition table