在 Hive 中将 Spark 数据帧保存为动态分区 table
Save Spark dataframe as dynamic partitioned table in Hive
我有一个示例应用程序可以将 csv 文件读取到数据框中。可以使用以下方法将数据帧以镶木地板格式存储到 Hive table
df.saveAsTable(tablename,mode)
。
上面的代码工作正常,但我每天都有太多数据,所以我想根据创建日期(table 中的列)对配置单元 table 进行动态分区。
有什么方法可以动态分区数据帧并将其存储到配置单元仓库中。想要避免使用 hivesqlcontext.sql(insert into table partittioin by(date)....)
.
对插入语句进行硬编码
问题可以看作是对:How to save DataFrame directly to Hive?
的扩展
非常感谢任何帮助。
我相信它是这样工作的:
df
是一个包含年月等列的dataframe
df.write.partitionBy('year', 'month').saveAsTable(...)
或
df.write.partitionBy('year', 'month').insertInto(...)
我能够使用 df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")
写入分区配置单元 table
我必须启用以下属性才能使其正常工作。
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
我也遇到过同样的事情,但使用以下技巧我解决了。
当我们将任何 table 做为分区时,分区列将区分大小写。
DataFrame 中应存在分区列且名称相同(区分大小写)。代码:
var dbName="your database name"
var finaltable="your table name"
// First check if table is available or not..
if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
//If table is not available then it will create for you..
println("Table Not Present \n Creating table " + finaltable)
sparkSession.sql("use Database_Name")
sparkSession.sql("SET hive.exec.dynamic.partition = true")
sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID string,EMP_Name string,EMP_Address string,EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)")
//Table is created now insert the DataFrame in append Mode
df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
}
这对我有用。我设置这些设置,然后将数据放在分区表中。
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode",
"nonstrict")
使用 python 和 spark 2.1.0 这对我有用。
不确定这是否是最好的方法,但它确实有效...
# WRITE DATA INTO A HIVE TABLE
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("local[*]") \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()
### CREATE HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'hive_df'
""")
spark.sql("""
INSERT INTO hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###
### CREATE NON HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION 'non_hive_df'
""")
spark.sql("""
INSERT INTO non_hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###
### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
spark.sql("""
INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite
它可以在 SparkSession
上以这种方式配置:
spark = SparkSession \
.builder \
...
.config("spark.hadoop.hive.exec.dynamic.partition", "true") \
.config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()
或者您可以将它们添加到 .properties 文件
Spark 配置需要 spark.hadoop
前缀(至少在 2.4 中),以下是 Spark 设置此配置的方式:
/**
* Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
* configuration without the spark.hadoop. prefix.
*/
def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
}
df1.write
.mode("append")
.format('ORC')
.partitionBy("date")
.option('path', '/hdfs_path')
.saveAsTable("DB.Partition_tablename")
它将创建具有“日期”列值的分区,并且还将在来自 spark DF 的配置单元中写入 Hive External Table。
我有一个示例应用程序可以将 csv 文件读取到数据框中。可以使用以下方法将数据帧以镶木地板格式存储到 Hive table
df.saveAsTable(tablename,mode)
。
上面的代码工作正常,但我每天都有太多数据,所以我想根据创建日期(table 中的列)对配置单元 table 进行动态分区。
有什么方法可以动态分区数据帧并将其存储到配置单元仓库中。想要避免使用 hivesqlcontext.sql(insert into table partittioin by(date)....)
.
问题可以看作是对:How to save DataFrame directly to Hive?
的扩展非常感谢任何帮助。
我相信它是这样工作的:
df
是一个包含年月等列的dataframe
df.write.partitionBy('year', 'month').saveAsTable(...)
或
df.write.partitionBy('year', 'month').insertInto(...)
我能够使用 df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")
我必须启用以下属性才能使其正常工作。
hiveContext.setConf("hive.exec.dynamic.partition", "true") hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
我也遇到过同样的事情,但使用以下技巧我解决了。
当我们将任何 table 做为分区时,分区列将区分大小写。
DataFrame 中应存在分区列且名称相同(区分大小写)。代码:
var dbName="your database name" var finaltable="your table name" // First check if table is available or not.. if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) { //If table is not available then it will create for you.. println("Table Not Present \n Creating table " + finaltable) sparkSession.sql("use Database_Name") sparkSession.sql("SET hive.exec.dynamic.partition = true") sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ") sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400") sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID string,EMP_Name string,EMP_Address string,EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)") //Table is created now insert the DataFrame in append Mode df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable) }
这对我有用。我设置这些设置,然后将数据放在分区表中。
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode",
"nonstrict")
使用 python 和 spark 2.1.0 这对我有用。
不确定这是否是最好的方法,但它确实有效...
# WRITE DATA INTO A HIVE TABLE
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("local[*]") \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()
### CREATE HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'hive_df'
""")
spark.sql("""
INSERT INTO hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###
### CREATE NON HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION 'non_hive_df'
""")
spark.sql("""
INSERT INTO non_hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###
### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
spark.sql("""
INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite
它可以在 SparkSession
上以这种方式配置:
spark = SparkSession \
.builder \
...
.config("spark.hadoop.hive.exec.dynamic.partition", "true") \
.config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()
或者您可以将它们添加到 .properties 文件
Spark 配置需要 spark.hadoop
前缀(至少在 2.4 中),以下是 Spark 设置此配置的方式:
/**
* Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
* configuration without the spark.hadoop. prefix.
*/
def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
}
df1.write
.mode("append")
.format('ORC')
.partitionBy("date")
.option('path', '/hdfs_path')
.saveAsTable("DB.Partition_tablename")
它将创建具有“日期”列值的分区,并且还将在来自 spark DF 的配置单元中写入 Hive External Table。