Spark with Hive 是否可以将项目阶段推送到 HiveTableScan?
Spark with Hive is it possible to push Project phase to HiveTableScan?
我正在使用 Spark SQL 查询 Hive 中以 ORC 格式存储的数据。
当我对提供给 spark.sql(query)
的查询进行 运行 解释命令时,我看到以下查询计划:
== Physical Plan ==
*Project [col1, col2, col3]
+- *Filter (....)
+- HiveTableScan [col1, col2, col3, ...col50]
据我所知,从 Hive 查询所有 50 列,然后才在 Spark 中进行过滤,后记仅选择所需的实际列。
是否可以将所需的列直接下推到 Hive,这样它们就不会一直加载到 Spark?
检查以下属性是否设置为默认值或 false?
spark.sql("SET spark.sql.orc.enabled=true");
spark.sql("SET spark.sql.hive.convertMetastoreOrc=true")
spark.sql("SET spark.sql.orc.filterPushdown=true")
这些可以帮助您避免读取不必要的列,并在您的数据分布在其中时利用 Hive orc table 的分区修剪
hdfs 上的不同分区。
将以上属性设置为 'true' 并查看您的解释计划显示的内容。
您还可以使用 Spark 的 orc 格式分区修剪获益,因为它不需要扫描整个 table 并且可以限制 Spark 在查询时需要的分区数量。它将有助于减少磁盘 input/ouput 操作。
例如:
我在下面的语句运行 中从 Hive orc 文件格式 table 创建一个数据框,该文件在列 'country'
& 'tran_date'
.
上分区
df=spark.sql("""select transaction_date,payment_type,city from test_dev_db.transactionmainhistorytable where country ='United Kingdom' and tran_date='2009-06-01' """)
给定的 table 有多个分区,如果我们查看上述查询的物理计划,我们可以看到它只扫描了一个分区。
== Physical Plan ==
*(1) Project [transaction_date#69, payment_type#72, city#74]
+- *(1) FileScan orc test_dev_db.transactionmainhistorytable[transaction_date#69,payment_type#72,city#74,country#76,tran_date#77]
Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://host/user/vikct001/dev/hadoop/database/test_dev...,
*PartitionCount: 1,* PartitionFilters: [isnotnull(country#76), isnotnull(tran_date#77), (country#76 = United Kingdom), (tran_date#77 = 2...,
PushedFilters: [], ReadSchema: struct<transaction_date:timestamp,payment_type:string,city:string>
请参阅 "PartitionCount: 1"
并将 PartitionFilters 设置为不为空。
同样,如果您在查询中指定了任何过滤器,则可以下推过滤器。
在这里,就像我使用城市列来过滤数据。
df=spark.sql("""select transaction_date,payment_type,city from test_dev_db.transactionmainhistorytable where country ='United Kingdom' and tran_date='2009-06-01' and city='London' """)
== Physical Plan ==
*(1) Project [transaction_date#104, payment_type#107, city#109]
+- *(1) Filter (isnotnull(city#109) && (city#109 = London))
+- *(1) FileScan orc test_dev_db.transactionmainhistorytable[transaction_date#104,payment_type#107,city#109,country#111,tran_date#112]
Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://host/user/vikct001/dev/hadoop/database/test_dev...,
PartitionCount: 1, PartitionFilters: [isnotnull(country#111), isnotnull(tran_date#112), (country#111 = United Kingdom), (tran_date#112...,
PushedFilters: [IsNotNull(city), EqualTo(city,London)], ReadSchema: struct<transaction_date:timestamp,payment_type:string,city:string>
上面可以看到 PushedFilters
不为空,它有一个国家列,其中有一个特定的值需要过滤。
我正在使用 Spark SQL 查询 Hive 中以 ORC 格式存储的数据。
当我对提供给 spark.sql(query)
的查询进行 运行 解释命令时,我看到以下查询计划:
== Physical Plan ==
*Project [col1, col2, col3]
+- *Filter (....)
+- HiveTableScan [col1, col2, col3, ...col50]
据我所知,从 Hive 查询所有 50 列,然后才在 Spark 中进行过滤,后记仅选择所需的实际列。
是否可以将所需的列直接下推到 Hive,这样它们就不会一直加载到 Spark?
检查以下属性是否设置为默认值或 false?
spark.sql("SET spark.sql.orc.enabled=true");
spark.sql("SET spark.sql.hive.convertMetastoreOrc=true")
spark.sql("SET spark.sql.orc.filterPushdown=true")
这些可以帮助您避免读取不必要的列,并在您的数据分布在其中时利用 Hive orc table 的分区修剪 hdfs 上的不同分区。
将以上属性设置为 'true' 并查看您的解释计划显示的内容。
您还可以使用 Spark 的 orc 格式分区修剪获益,因为它不需要扫描整个 table 并且可以限制 Spark 在查询时需要的分区数量。它将有助于减少磁盘 input/ouput 操作。
例如:
我在下面的语句运行 中从 Hive orc 文件格式 table 创建一个数据框,该文件在列 'country'
& 'tran_date'
.
df=spark.sql("""select transaction_date,payment_type,city from test_dev_db.transactionmainhistorytable where country ='United Kingdom' and tran_date='2009-06-01' """)
给定的 table 有多个分区,如果我们查看上述查询的物理计划,我们可以看到它只扫描了一个分区。
== Physical Plan ==
*(1) Project [transaction_date#69, payment_type#72, city#74]
+- *(1) FileScan orc test_dev_db.transactionmainhistorytable[transaction_date#69,payment_type#72,city#74,country#76,tran_date#77]
Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://host/user/vikct001/dev/hadoop/database/test_dev...,
*PartitionCount: 1,* PartitionFilters: [isnotnull(country#76), isnotnull(tran_date#77), (country#76 = United Kingdom), (tran_date#77 = 2...,
PushedFilters: [], ReadSchema: struct<transaction_date:timestamp,payment_type:string,city:string>
请参阅 "PartitionCount: 1"
并将 PartitionFilters 设置为不为空。
同样,如果您在查询中指定了任何过滤器,则可以下推过滤器。 在这里,就像我使用城市列来过滤数据。
df=spark.sql("""select transaction_date,payment_type,city from test_dev_db.transactionmainhistorytable where country ='United Kingdom' and tran_date='2009-06-01' and city='London' """)
== Physical Plan ==
*(1) Project [transaction_date#104, payment_type#107, city#109]
+- *(1) Filter (isnotnull(city#109) && (city#109 = London))
+- *(1) FileScan orc test_dev_db.transactionmainhistorytable[transaction_date#104,payment_type#107,city#109,country#111,tran_date#112]
Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://host/user/vikct001/dev/hadoop/database/test_dev...,
PartitionCount: 1, PartitionFilters: [isnotnull(country#111), isnotnull(tran_date#112), (country#111 = United Kingdom), (tran_date#112...,
PushedFilters: [IsNotNull(city), EqualTo(city,London)], ReadSchema: struct<transaction_date:timestamp,payment_type:string,city:string>
上面可以看到 PushedFilters
不为空,它有一个国家列,其中有一个特定的值需要过滤。