在 HDP2.6 中将 Spark Dataframe 写入 Hive 可访问 table
Write Spark Dataframe to Hive accessible table in HDP2.6
我知道已经有很多关于从 Spark 写入 HIVE 的答案,但似乎没有一个对我有用。首先是一些背景。这是一个较旧的集群,运行ning HDP2.6,即 Hive2 和 Spark 2.1。
这里是一个示例程序:
case class Record(key: Int, value: String)
val spark = SparkSession.builder()
.appName("Test App")
.config("spark.sql.warehouse.dir", "/app/hive/warehouse")
.enableHiveSupport()
.getOrCreate()
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
records.write.saveAsTable("records_table")
如果我登录到 spark-shell 和 运行 该代码,一个名为 records_table 的新 table 会出现在 Hive 中。但是,如果我将该代码部署在一个 jar 中,并使用 spark-submit 将其提交到集群,我将看到 table 出现在与所有其他 HIVE table 相同的 HDFS 位置, 但 HIVE 无法访问它。
我知道在 HDP 3.1 中你必须使用 HiveWarehouseConnector class,但我在 HDP 2.6 中找不到任何关于它的参考。有些人提到了 HiveContext class,而其他人则说只使用 SparkSessionBuilder 中的 enableHiveSupport
调用。我已经尝试了这两种方法,但似乎都不起作用。我试过了saveAsTable
。我试过了insertInto
。我什至尝试创建一个临时视图,然后 hiveContext.sql("create table if not exists mytable as select * from tmptable")。每次尝试,我都会在 hdfs:/apps/hive/warehouse 中获得一个镶木地板文件,但我无法从 HIVE 本身访问该 table。
根据提供的信息,我建议您这样做,
- 创建 Spark 会话,
enableHiveSupport
是必需的,
val spark = SparkSession.builder()
.appName("Test App")
.enableHiveSupport()
.getOrCreate()
- 接下来,通过
spark.sql
、 为table结果table执行DDL
val ddlStr: String =
s"""CREATE EXTERNAL TABLE IF NOT EXISTS records_table(key int, value string)
|ROW FORMAT SERDE
| 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
|STORED AS INPUTFORMAT
| 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
|OUTPUTFORMAT
| 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
|LOCATION '$hdfsLocation'""".stripMargin
spark.sql(ddlStr)
- 根据您的用例写入数据,
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.write.format("orc").insertInto("records_table")
备注:
- spark-shell 和 spark-submit
的工作方式类似
- 分区可以在 DDL 中定义,所以在写入数据帧时不要使用
partitionBy
。
- 不支持分桶/集群。
希望这有帮助/干杯。
我知道已经有很多关于从 Spark 写入 HIVE 的答案,但似乎没有一个对我有用。首先是一些背景。这是一个较旧的集群,运行ning HDP2.6,即 Hive2 和 Spark 2.1。
这里是一个示例程序:
case class Record(key: Int, value: String)
val spark = SparkSession.builder()
.appName("Test App")
.config("spark.sql.warehouse.dir", "/app/hive/warehouse")
.enableHiveSupport()
.getOrCreate()
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
records.write.saveAsTable("records_table")
如果我登录到 spark-shell 和 运行 该代码,一个名为 records_table 的新 table 会出现在 Hive 中。但是,如果我将该代码部署在一个 jar 中,并使用 spark-submit 将其提交到集群,我将看到 table 出现在与所有其他 HIVE table 相同的 HDFS 位置, 但 HIVE 无法访问它。
我知道在 HDP 3.1 中你必须使用 HiveWarehouseConnector class,但我在 HDP 2.6 中找不到任何关于它的参考。有些人提到了 HiveContext class,而其他人则说只使用 SparkSessionBuilder 中的 enableHiveSupport
调用。我已经尝试了这两种方法,但似乎都不起作用。我试过了saveAsTable
。我试过了insertInto
。我什至尝试创建一个临时视图,然后 hiveContext.sql("create table if not exists mytable as select * from tmptable")。每次尝试,我都会在 hdfs:/apps/hive/warehouse 中获得一个镶木地板文件,但我无法从 HIVE 本身访问该 table。
根据提供的信息,我建议您这样做,
- 创建 Spark 会话,
enableHiveSupport
是必需的,
val spark = SparkSession.builder()
.appName("Test App")
.enableHiveSupport()
.getOrCreate()
- 接下来,通过
spark.sql
、 为table结果table执行DDL
val ddlStr: String =
s"""CREATE EXTERNAL TABLE IF NOT EXISTS records_table(key int, value string)
|ROW FORMAT SERDE
| 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
|STORED AS INPUTFORMAT
| 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
|OUTPUTFORMAT
| 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
|LOCATION '$hdfsLocation'""".stripMargin
spark.sql(ddlStr)
- 根据您的用例写入数据,
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.write.format("orc").insertInto("records_table")
备注:
- spark-shell 和 spark-submit 的工作方式类似
- 分区可以在 DDL 中定义,所以在写入数据帧时不要使用
partitionBy
。 - 不支持分桶/集群。
希望这有帮助/干杯。