如何以编程方式生成基于数据框的 create table 语句
How to programmatically generate create table statement based on data frame
我正在尝试根据 avro 数据的 data
部分创建 Hive table 的功能。源数据的架构如下所示。目标 table 需要按源数据中的 partition
字段进行分区,有两列 name
和 description
。我可以通过 df.select('data.*')
获取源 data
部分,通过 df.select('data.*').schema
获取 table 架构,但 partition
列不在其中。我的目标是创建一个 table 子句 create table mytable (name string, description string) partitioned by (partition integer) store as parquet
我该怎么做?我需要先将 df.select('partition.*')
附加到 df.select('data.*') 吗?非常感谢您的帮助。
已编辑:目标是您不需要指定列的级别,如 data.name 和分区,而只需传入“列”和“分区列”(可以是任何嵌套级别,然后生成一个create table语句。
root
|--metadata: struct
| |---id: string
| |---time : string
|--data:struct
| |---name : string
| |---description : string
|--partition:integer
以下独立示例向您展示了如何创建和编写您指定的 table。您需要提供自己的 path_for_saving
.
import pyspark.sql.functions as F
import pyspark.sql.types as T
schema = T.StructType([
T.StructField('metadata', T.StructType([
T.StructField("id",T.StringType()),
T.StructField("time",T.StringType())])),
T.StructField('data', T.StructType([
T.StructField("name",T.StringType()),
T.StructField("description",T.StringType()),
])),
T.StructField("partition", T.IntegerType()),
T.StructField("Level1", T.StructType([
T.StructField("Level2",T.StructType([
T.StructField("Level3", T.StructType([
T.StructField("partition_alt", T.IntegerType())]))]))]))
])
df_sample_data = spark.createDataFrame([(("id1", "time1"), ("name1", "desc1"), 1, (((3,),),)), (("id2", "time2"), ("name2", "desc2"), 2, (((4,),),)) ], schema)
df_sample_data.printSchema()
df_sample_data.show()
def parse_fields(schema, path=""):
collect = []
for struct_field in schema:
this_field_name = struct_field.name
if type(struct_field.dataType) == T.StructType:
collect = collect + parse_fields(struct_field.dataType, path + this_field_name + ".")
else:
collect = collect + [path + this_field_name]
return collect
parsed_fields = parse_fields(schema) # Find all leaf fields in the schema and return as '.' seperated path
print("Parsed fields:" + str(parsed_fields))
def get_column(col_name):
for field in parsed_fields:
if col_name in field:
return F.col(field).alias(col_name)
name_col = "name"
description_col = "description"
partition_col = "partition_alt"
df_mytable = df_sample_data.select(get_column(name_col), get_column(description_col), get_column(partition_col))
df_mytable.show()
df_mytable.write.partitionBy(partition_col).format("parquet").save(path_for_saving)
输出:
root
|-- metadata: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- time: string (nullable = true)
|-- data: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- description: string (nullable = true)
|-- partition: integer (nullable = true)
|-- Level1: struct (nullable = true)
| |-- Level2: struct (nullable = true)
| | |-- Level3: struct (nullable = true)
| | | |-- partition_alt: integer (nullable = true)
+------------+--------------+---------+-------+
| metadata| data|partition| Level1|
+------------+--------------+---------+-------+
|{id1, time1}|{name1, desc1}| 1|{{{3}}}|
|{id2, time2}|{name2, desc2}| 2|{{{4}}}|
+------------+--------------+---------+-------+
Parsed fields:['metadata.id', 'metadata.time', 'data.name', 'data.description', 'partition', 'Level1.Level2.Level3.partition_alt']
+-----+-----------+-------------+
| name|description|partition_alt|
+-----+-----------+-------------+
|name1| desc1| 3|
|name2| desc2| 4|
+-----+-----------+-------------+
该示例演示了如何查找深度嵌套的字段。您需要使用自己的条件重写 get_column
以将字段名称与完整的列名称匹配。在这里,get_column
只是 returns 名称中包含 col_name
的第一个字段。
我正在尝试根据 avro 数据的 data
部分创建 Hive table 的功能。源数据的架构如下所示。目标 table 需要按源数据中的 partition
字段进行分区,有两列 name
和 description
。我可以通过 df.select('data.*')
获取源 data
部分,通过 df.select('data.*').schema
获取 table 架构,但 partition
列不在其中。我的目标是创建一个 table 子句 create table mytable (name string, description string) partitioned by (partition integer) store as parquet
我该怎么做?我需要先将 df.select('partition.*')
附加到 df.select('data.*') 吗?非常感谢您的帮助。
已编辑:目标是您不需要指定列的级别,如 data.name 和分区,而只需传入“列”和“分区列”(可以是任何嵌套级别,然后生成一个create table语句。
root
|--metadata: struct
| |---id: string
| |---time : string
|--data:struct
| |---name : string
| |---description : string
|--partition:integer
以下独立示例向您展示了如何创建和编写您指定的 table。您需要提供自己的 path_for_saving
.
import pyspark.sql.functions as F
import pyspark.sql.types as T
schema = T.StructType([
T.StructField('metadata', T.StructType([
T.StructField("id",T.StringType()),
T.StructField("time",T.StringType())])),
T.StructField('data', T.StructType([
T.StructField("name",T.StringType()),
T.StructField("description",T.StringType()),
])),
T.StructField("partition", T.IntegerType()),
T.StructField("Level1", T.StructType([
T.StructField("Level2",T.StructType([
T.StructField("Level3", T.StructType([
T.StructField("partition_alt", T.IntegerType())]))]))]))
])
df_sample_data = spark.createDataFrame([(("id1", "time1"), ("name1", "desc1"), 1, (((3,),),)), (("id2", "time2"), ("name2", "desc2"), 2, (((4,),),)) ], schema)
df_sample_data.printSchema()
df_sample_data.show()
def parse_fields(schema, path=""):
collect = []
for struct_field in schema:
this_field_name = struct_field.name
if type(struct_field.dataType) == T.StructType:
collect = collect + parse_fields(struct_field.dataType, path + this_field_name + ".")
else:
collect = collect + [path + this_field_name]
return collect
parsed_fields = parse_fields(schema) # Find all leaf fields in the schema and return as '.' seperated path
print("Parsed fields:" + str(parsed_fields))
def get_column(col_name):
for field in parsed_fields:
if col_name in field:
return F.col(field).alias(col_name)
name_col = "name"
description_col = "description"
partition_col = "partition_alt"
df_mytable = df_sample_data.select(get_column(name_col), get_column(description_col), get_column(partition_col))
df_mytable.show()
df_mytable.write.partitionBy(partition_col).format("parquet").save(path_for_saving)
输出:
root
|-- metadata: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- time: string (nullable = true)
|-- data: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- description: string (nullable = true)
|-- partition: integer (nullable = true)
|-- Level1: struct (nullable = true)
| |-- Level2: struct (nullable = true)
| | |-- Level3: struct (nullable = true)
| | | |-- partition_alt: integer (nullable = true)
+------------+--------------+---------+-------+
| metadata| data|partition| Level1|
+------------+--------------+---------+-------+
|{id1, time1}|{name1, desc1}| 1|{{{3}}}|
|{id2, time2}|{name2, desc2}| 2|{{{4}}}|
+------------+--------------+---------+-------+
Parsed fields:['metadata.id', 'metadata.time', 'data.name', 'data.description', 'partition', 'Level1.Level2.Level3.partition_alt']
+-----+-----------+-------------+
| name|description|partition_alt|
+-----+-----------+-------------+
|name1| desc1| 3|
|name2| desc2| 4|
+-----+-----------+-------------+
该示例演示了如何查找深度嵌套的字段。您需要使用自己的条件重写 get_column
以将字段名称与完整的列名称匹配。在这里,get_column
只是 returns 名称中包含 col_name
的第一个字段。