Spark SQL HiveContext - saveAsTable 创建了错误的模式
Spark SQL HiveContext - saveAsTable creates wrong schema
我尝试将 Dataframe 存储到 Spark 1.3.0 (PySpark) 中的持久 Hive table。这是我的代码:
sc = SparkContext(appName="HiveTest")
hc = HiveContext(sc)
peopleRDD = sc.parallelize(['{"name":"Yin","age":30}'])
peopleDF = hc.jsonRDD(peopleRDD)
peopleDF.printSchema()
#root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
peopleDF.saveAsTable("peopleHive")
我期望的 Hive 输出 table 是:
Column Data Type Comments
age long from deserializer
name string from deserializer
但是上面代码的实际输出Hivetable是:
Column Data Type Comments
col array<string> from deserializer
为什么 Hive table 与 DataFrame 的架构不同?如何达到预期的输出?
不是架构错误。 Hive 无法正确读取由 Spark 创建的 table,因为它甚至还没有正确的 parquet serde。
如果您执行 sqlCtx.sql('desc peopleHive').show()
,它应该显示正确的架构。
或者您可以使用 spark-sql 客户端而不是 hive。您还可以使用 create table 语法创建外部 tables,其工作方式与 Hive 类似,但 Spark 对 parquet 的支持要好得多。
覆盖类行为的解决方法是
val viewName = "tempView"
df.createTempView(viewName)
df.sparkSession.sql(s"DROP TABLE IF EXISTS ${tableName}")
df.sparkSession.sql(
s"""CREATE TABLE $tableName AS
| select * from $viewName
""".stripMargin)
df.sparkSession.catalog.dropTempView(viewName)
我尝试将 Dataframe 存储到 Spark 1.3.0 (PySpark) 中的持久 Hive table。这是我的代码:
sc = SparkContext(appName="HiveTest")
hc = HiveContext(sc)
peopleRDD = sc.parallelize(['{"name":"Yin","age":30}'])
peopleDF = hc.jsonRDD(peopleRDD)
peopleDF.printSchema()
#root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
peopleDF.saveAsTable("peopleHive")
我期望的 Hive 输出 table 是:
Column Data Type Comments
age long from deserializer
name string from deserializer
但是上面代码的实际输出Hivetable是:
Column Data Type Comments
col array<string> from deserializer
为什么 Hive table 与 DataFrame 的架构不同?如何达到预期的输出?
不是架构错误。 Hive 无法正确读取由 Spark 创建的 table,因为它甚至还没有正确的 parquet serde。
如果您执行 sqlCtx.sql('desc peopleHive').show()
,它应该显示正确的架构。
或者您可以使用 spark-sql 客户端而不是 hive。您还可以使用 create table 语法创建外部 tables,其工作方式与 Hive 类似,但 Spark 对 parquet 的支持要好得多。
覆盖类行为的解决方法是
val viewName = "tempView"
df.createTempView(viewName)
df.sparkSession.sql(s"DROP TABLE IF EXISTS ${tableName}")
df.sparkSession.sql(
s"""CREATE TABLE $tableName AS
| select * from $viewName
""".stripMargin)
df.sparkSession.catalog.dropTempView(viewName)