Spark with Hive 是否可以将项目阶段推送到 HiveTableScan?

Spark with Hive is it possible to push Project phase to HiveTableScan?

我正在使用 Spark SQL 查询 Hive 中以 ORC 格式存储的数据。

当我对提供给 spark.sql(query) 的查询进行 运行 解释命令时,我看到以下查询计划:

== Physical Plan ==
*Project [col1, col2, col3]
+- *Filter (....)
   +- HiveTableScan [col1, col2, col3, ...col50]

据我所知,从 Hive 查询所有 50 列,然后才在 Spark 中进行过滤,后记仅选择所需的实际列。

是否可以将所需的列直接下推到 Hive,这样它们就不会一直加载到 Spark?

检查以下属性是否设置为默认值或 false?

spark.sql("SET spark.sql.orc.enabled=true");
spark.sql("SET spark.sql.hive.convertMetastoreOrc=true")
spark.sql("SET spark.sql.orc.filterPushdown=true")

这些可以帮助您避免读取不必要的列,并在您的数据分布在其中时利用 Hive orc table 的分区修剪 hdfs 上的不同分区。

将以上属性设置为 'true' 并查看您的解释计划显示的内容。

您还可以使用 Spark 的 orc 格式分区修剪获益,因为它不需要扫描整个 table 并且可以限制 Spark 在查询时需要的分区数量。它将有助于减少磁盘 input/ouput 操作。

例如:

我在下面的语句运行 中从 Hive orc 文件格式 table 创建一个数据框,该文件在列 'country' & 'tran_date'.

上分区
df=spark.sql("""select transaction_date,payment_type,city from test_dev_db.transactionmainhistorytable where country ='United Kingdom' and tran_date='2009-06-01' """)

给定的 table 有多个分区,如果我们查看上述查询的物理计划,我们可以看到它只扫描了一个分区。

== Physical Plan ==
*(1) Project [transaction_date#69, payment_type#72, city#74]
+- *(1) FileScan orc test_dev_db.transactionmainhistorytable[transaction_date#69,payment_type#72,city#74,country#76,tran_date#77] 
Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://host/user/vikct001/dev/hadoop/database/test_dev..., 
*PartitionCount: 1,* PartitionFilters: [isnotnull(country#76), isnotnull(tran_date#77), (country#76 = United Kingdom), (tran_date#77 = 2..., 
PushedFilters: [], ReadSchema: struct<transaction_date:timestamp,payment_type:string,city:string>

请参阅 "PartitionCount: 1" 并将 PartitionFilters 设置为不为空。

同样,如果您在查询中指定了任何过滤器,则可以下推过滤器。 在这里,就像我使用城市列来过滤数据。

df=spark.sql("""select transaction_date,payment_type,city from test_dev_db.transactionmainhistorytable where country ='United Kingdom' and tran_date='2009-06-01' and city='London' """)


== Physical Plan ==
*(1) Project [transaction_date#104, payment_type#107, city#109]
+- *(1) Filter (isnotnull(city#109) && (city#109 = London))
   +- *(1) FileScan orc test_dev_db.transactionmainhistorytable[transaction_date#104,payment_type#107,city#109,country#111,tran_date#112] 
   Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://host/user/vikct001/dev/hadoop/database/test_dev..., 
   PartitionCount: 1, PartitionFilters: [isnotnull(country#111), isnotnull(tran_date#112), (country#111 = United Kingdom), (tran_date#112..., 
   PushedFilters: [IsNotNull(city), EqualTo(city,London)], ReadSchema: struct<transaction_date:timestamp,payment_type:string,city:string>

上面可以看到 PushedFilters 不为空,它有一个国家列,其中有一个特定的值需要过滤。