使用 parquet 文件元数据创建配置单元 table
Creating hive table using parquet file metadata
我写了一个 DataFrame 作为 parquet 文件。而且,我想使用镶木地板中的元数据使用 Hive 读取文件。
写入 parquet write 的输出
_common_metadata part-r-00000-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet part-r-00002-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet _SUCCESS
_metadata part-r-00001-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet part-r-00003-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet
蜂巢table
CREATE TABLE testhive
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'/home/gz_files/result';
FAILED: SemanticException [Error 10043]: Either list of columns or a custom serializer should be specified
如何从 parquet 文件中推断出元数据?
如果我打开 _common_metadata
我有以下内容,
PAR1LHroot
%TSN%
%TS%
%Etype%
)org.apache.spark.sql.parquet.row.metadata▒{"type":"struct","fields":[{"name":"TSN","type":"string","nullable":true,"metadata":{}},{"name":"TS","type":"string","nullable":true,"metadata":{}},{"name":"Etype","type":"string","nullable":true,"metadata":{}}]}
或者如何解析元数据文件?
我也有同样的问题。不过,从实用的角度可能很难实现,因为 Parquet 支持模式演化:
例如,您可以向 table 添加一个新列,而不必触及 table 中已有的数据。只有新的数据文件才会有新的元数据(与以前的版本兼容)。
自 Spark 1.5.0 以来模式合并默认关闭,因为它是 "relatively expensive operation"
http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
因此,推断最近的模式可能并不像听起来那么简单。尽管快速而肮脏的方法是很有可能的,例如通过解析
的输出
$ parquet-tools schema /home/gz_files/result/000000_0
实际上,Impala支持
CREATE TABLE LIKE PARQUET
(完全没有列部分):
https://docs.cloudera.com/runtime/7.2.15/impala-sql-reference/topics/impala-create-table.html
你的问题的标签有“hive”和“spark”,我没有看到 Hive 中实现了这个,但如果你使用 CDH,它可能就是你要找的。
这是我想出的解决方案,可以从 parquet 文件中获取元数据以创建 Hive table。
首先启动一个 spark-shell(或者将其全部编译成一个 Jar,然后 运行 使用 spark-submit,但是 shell 更容易)
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
val df=sqlContext.parquetFile("/path/to/_common_metadata")
def creatingTableDDL(tableName:String, df:DataFrame): String={
val cols = df.dtypes
var ddl1 = "CREATE EXTERNAL TABLE "+tableName + " ("
//looks at the datatypes and columns names and puts them into a string
val colCreate = (for (c <-cols) yield(c._1+" "+c._2.replace("Type",""))).mkString(", ")
ddl1 += colCreate + ") STORED AS PARQUET LOCATION '/wherever/you/store/the/data/'"
ddl1
}
val test_tableDDL=creatingTableDDL("test_table",df,"test_db")
它将为您提供 Hive 将用于每个列的数据类型,因为它们存储在 Parquet 中。
例如:CREATE EXTERNAL TABLE test_table (COL1 Decimal(38,10), COL2 String, COL3 Timestamp) STORED AS PARQUET LOCATION '/path/to/parquet/files'
我想进一步说明 James Tobin 的回答。有一个 StructField class 提供 Hive 的数据类型而不进行字符串替换。
// Tested on Spark 1.6.0.
import org.apache.spark.sql.DataFrame
def dataFrameToDDL(dataFrame: DataFrame, tableName: String): String = {
val columns = dataFrame.schema.map { field =>
" " + field.name + " " + field.dataType.simpleString.toUpperCase
}
s"CREATE TABLE $tableName (\n${columns.mkString(",\n")}\n)"
}
这解决了 IntegerType 问题。
scala> val dataFrame = sc.parallelize(Seq((1, "a"), (2, "b"))).toDF("x", "y")
dataFrame: org.apache.spark.sql.DataFrame = [x: int, y: string]
scala> print(dataFrameToDDL(dataFrame, "t"))
CREATE TABLE t (
x INT,
y STRING
)
这应该适用于任何 DataFrame,而不仅仅是 Parquet。 (例如,我将其与 JDBC DataFrame 一起使用。)
作为额外的好处,如果您的目标 DDL 支持可为空的列,您可以通过检查 StructField.nullable
来扩展该功能。
对 Victor 的小改进(在 field.name 上添加引号)并修改为将 table 绑定到本地镶木地板文件(在 spark 1.6.1 上测试)
def dataFrameToDDL(dataFrame: DataFrame, tableName: String, absFilePath: String): String = {
val columns = dataFrame.schema.map { field =>
" `" + field.name + "` " + field.dataType.simpleString.toUpperCase
}
s"CREATE EXTERNAL TABLE $tableName (\n${columns.mkString(",\n")}\n) STORED AS PARQUET LOCATION '"+absFilePath+"'"
}
另请注意:
- 需要一个 HiveContext,因为 SQLContext 不支持创建
外部 table.
- parquet 文件夹的路径必须是绝对路径
我想扩展 James 的答案,
以下代码适用于所有数据类型,包括 ARRAY、MAP 和 STRUCT。
已在 SPARK 2.2 中测试
val df=sqlContext.parquetFile("parquetFilePath")
val schema = df.schema
var columns = schema.fields
var ddl1 = "CREATE EXTERNAL TABLE " tableName + " ("
val cols=(for(column <- columns) yield column.name+" "+column.dataType.sql).mkString(",")
ddl1=ddl1+cols+" ) STORED AS PARQUET LOCATION '/tmp/hive_test1/'"
spark.sql(ddl1)
我写了一个 DataFrame 作为 parquet 文件。而且,我想使用镶木地板中的元数据使用 Hive 读取文件。
写入 parquet write 的输出
_common_metadata part-r-00000-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet part-r-00002-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet _SUCCESS
_metadata part-r-00001-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet part-r-00003-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet
蜂巢table
CREATE TABLE testhive
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'/home/gz_files/result';
FAILED: SemanticException [Error 10043]: Either list of columns or a custom serializer should be specified
如何从 parquet 文件中推断出元数据?
如果我打开 _common_metadata
我有以下内容,
PAR1LHroot
%TSN%
%TS%
%Etype%
)org.apache.spark.sql.parquet.row.metadata▒{"type":"struct","fields":[{"name":"TSN","type":"string","nullable":true,"metadata":{}},{"name":"TS","type":"string","nullable":true,"metadata":{}},{"name":"Etype","type":"string","nullable":true,"metadata":{}}]}
或者如何解析元数据文件?
我也有同样的问题。不过,从实用的角度可能很难实现,因为 Parquet 支持模式演化:
例如,您可以向 table 添加一个新列,而不必触及 table 中已有的数据。只有新的数据文件才会有新的元数据(与以前的版本兼容)。
自 Spark 1.5.0 以来模式合并默认关闭,因为它是 "relatively expensive operation" http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging 因此,推断最近的模式可能并不像听起来那么简单。尽管快速而肮脏的方法是很有可能的,例如通过解析
的输出$ parquet-tools schema /home/gz_files/result/000000_0
实际上,Impala支持
CREATE TABLE LIKE PARQUET
(完全没有列部分):
https://docs.cloudera.com/runtime/7.2.15/impala-sql-reference/topics/impala-create-table.html
你的问题的标签有“hive”和“spark”,我没有看到 Hive 中实现了这个,但如果你使用 CDH,它可能就是你要找的。
这是我想出的解决方案,可以从 parquet 文件中获取元数据以创建 Hive table。
首先启动一个 spark-shell(或者将其全部编译成一个 Jar,然后 运行 使用 spark-submit,但是 shell 更容易)
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
val df=sqlContext.parquetFile("/path/to/_common_metadata")
def creatingTableDDL(tableName:String, df:DataFrame): String={
val cols = df.dtypes
var ddl1 = "CREATE EXTERNAL TABLE "+tableName + " ("
//looks at the datatypes and columns names and puts them into a string
val colCreate = (for (c <-cols) yield(c._1+" "+c._2.replace("Type",""))).mkString(", ")
ddl1 += colCreate + ") STORED AS PARQUET LOCATION '/wherever/you/store/the/data/'"
ddl1
}
val test_tableDDL=creatingTableDDL("test_table",df,"test_db")
它将为您提供 Hive 将用于每个列的数据类型,因为它们存储在 Parquet 中。
例如:CREATE EXTERNAL TABLE test_table (COL1 Decimal(38,10), COL2 String, COL3 Timestamp) STORED AS PARQUET LOCATION '/path/to/parquet/files'
我想进一步说明 James Tobin 的回答。有一个 StructField class 提供 Hive 的数据类型而不进行字符串替换。
// Tested on Spark 1.6.0.
import org.apache.spark.sql.DataFrame
def dataFrameToDDL(dataFrame: DataFrame, tableName: String): String = {
val columns = dataFrame.schema.map { field =>
" " + field.name + " " + field.dataType.simpleString.toUpperCase
}
s"CREATE TABLE $tableName (\n${columns.mkString(",\n")}\n)"
}
这解决了 IntegerType 问题。
scala> val dataFrame = sc.parallelize(Seq((1, "a"), (2, "b"))).toDF("x", "y")
dataFrame: org.apache.spark.sql.DataFrame = [x: int, y: string]
scala> print(dataFrameToDDL(dataFrame, "t"))
CREATE TABLE t (
x INT,
y STRING
)
这应该适用于任何 DataFrame,而不仅仅是 Parquet。 (例如,我将其与 JDBC DataFrame 一起使用。)
作为额外的好处,如果您的目标 DDL 支持可为空的列,您可以通过检查 StructField.nullable
来扩展该功能。
对 Victor 的小改进(在 field.name 上添加引号)并修改为将 table 绑定到本地镶木地板文件(在 spark 1.6.1 上测试)
def dataFrameToDDL(dataFrame: DataFrame, tableName: String, absFilePath: String): String = {
val columns = dataFrame.schema.map { field =>
" `" + field.name + "` " + field.dataType.simpleString.toUpperCase
}
s"CREATE EXTERNAL TABLE $tableName (\n${columns.mkString(",\n")}\n) STORED AS PARQUET LOCATION '"+absFilePath+"'"
}
另请注意:
- 需要一个 HiveContext,因为 SQLContext 不支持创建 外部 table.
- parquet 文件夹的路径必须是绝对路径
我想扩展 James 的答案,
以下代码适用于所有数据类型,包括 ARRAY、MAP 和 STRUCT。
已在 SPARK 2.2 中测试
val df=sqlContext.parquetFile("parquetFilePath")
val schema = df.schema
var columns = schema.fields
var ddl1 = "CREATE EXTERNAL TABLE " tableName + " ("
val cols=(for(column <- columns) yield column.name+" "+column.dataType.sql).mkString(",")
ddl1=ddl1+cols+" ) STORED AS PARQUET LOCATION '/tmp/hive_test1/'"
spark.sql(ddl1)