在 Hive 中将 Spark 数据帧保存为动态分区 table

Save Spark dataframe as dynamic partitioned table in Hive

我有一个示例应用程序可以将 csv 文件读取到数据框中。可以使用以下方法将数据帧以镶木地板格式存储到 Hive table df.saveAsTable(tablename,mode)

上面的代码工作正常,但我每天都有太多数据,所以我想根据创建日期(table 中的列)对配置单元 table 进行动态分区。

有什么方法可以动态分区数据帧并将其存储到配置单元仓库中。想要避免使用 hivesqlcontext.sql(insert into table partittioin by(date)....).

对插入语句进行硬编码

问题可以看作是对:How to save DataFrame directly to Hive?

的扩展

非常感谢任何帮助。

我相信它是这样工作的:

df是一个包含年月等列的dataframe

df.write.partitionBy('year', 'month').saveAsTable(...)

df.write.partitionBy('year', 'month').insertInto(...)

我能够使用 df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")

写入分区配置单元 table

我必须启用以下属性才能使其正常工作。

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

我也遇到过同样的事情,但使用以下技巧我解决了。

  1. 当我们将任何 table 做为分区时,分区列将区分大小写。

  2. DataFrame 中应存在分区列且名称相同(区分大小写)。代码:

    var dbName="your database name"
    var finaltable="your table name"
    
    // First check if table is available or not..
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
         //If table is not available then it will create for you..
         println("Table Not Present \n  Creating table " + finaltable)
         sparkSession.sql("use Database_Name")
         sparkSession.sql("SET hive.exec.dynamic.partition = true")
         sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
         sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
         sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID        string,EMP_Name          string,EMP_Address               string,EMP_Salary    bigint)  PARTITIONED BY (EMP_DEP STRING)")
         //Table is created now insert the DataFrame in append Mode
         df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
    }
    

这对我有用。我设置这些设置,然后将数据放在分区表中。

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", 
"nonstrict")

使用 python 和 spark 2.1.0 这对我有用。

不确定这是否是最好的方法,但它确实有效...

# WRITE DATA INTO A HIVE TABLE
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

### CREATE HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'hive_df'
""")
spark.sql("""
INSERT INTO hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###

### CREATE NON HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION 'non_hive_df'
""")
spark.sql("""
INSERT INTO non_hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###

### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
spark.sql("""
INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")

spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite

它可以在 SparkSession 上以这种方式配置:

spark = SparkSession \
    .builder \
    ...
    .config("spark.hadoop.hive.exec.dynamic.partition", "true") \
    .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

或者您可以将它们添加到 .properties 文件

Spark 配置需要 spark.hadoop 前缀(至少在 2.4 中),以下是 Spark 设置此配置的方式:

  /**
   * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
   * configuration without the spark.hadoop. prefix.
   */
  def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
    SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
  }
df1.write
   .mode("append")
   .format('ORC')
   .partitionBy("date")
   .option('path', '/hdfs_path')
   .saveAsTable("DB.Partition_tablename")

它将创建具有“日期”列值的分区,并且还将在来自 spark DF 的配置单元中写入 Hive External Table。