如何以编程方式生成基于数据框的 create table 语句

How to programmatically generate create table statement based on data frame

我正在尝试根据 avro 数据的 data 部分创建 Hive table 的功能。源数据的架构如下所示。目标 table 需要按源数据中的 partition 字段进行分区,有两列 namedescription。我可以通过 df.select('data.*') 获取源 data 部分,通过 df.select('data.*').schema 获取 table 架构,但 partition 列不在其中。我的目标是创建一个 table 子句 create table mytable (name string, description string) partitioned by (partition integer) store as parquet 我该怎么做?我需要先将 df.select('partition.*') 附加到 df.select('data.*') 吗?非常感谢您的帮助。

已编辑:目标是您不需要指定列的级别,如 data.name 和分区,而只需传入“列”和“分区列”(可以是任何嵌套级别,然后生成一个create table语句。

root
|--metadata: struct
|   |---id: string
|   |---time : string
|--data:struct
|   |---name : string
|   |---description : string
|--partition:integer

以下独立示例向您展示了如何创建和编写您指定的 table。您需要提供自己的 path_for_saving.

import pyspark.sql.functions as F
import pyspark.sql.types as T

schema = T.StructType([
          T.StructField('metadata', T.StructType([
            T.StructField("id",T.StringType()),
            T.StructField("time",T.StringType())])),
          T.StructField('data', T.StructType([
            T.StructField("name",T.StringType()),
            T.StructField("description",T.StringType()),
          ])),
          T.StructField("partition", T.IntegerType()),
          T.StructField("Level1", T.StructType([
            T.StructField("Level2",T.StructType([
              T.StructField("Level3", T.StructType([
                T.StructField("partition_alt", T.IntegerType())]))]))]))
         ])
df_sample_data = spark.createDataFrame([(("id1", "time1"), ("name1", "desc1"), 1, (((3,),),)), (("id2", "time2"), ("name2", "desc2"), 2, (((4,),),)) ], schema)
df_sample_data.printSchema()
df_sample_data.show()

def parse_fields(schema, path=""):
  collect = []
  for struct_field in schema:
    this_field_name = struct_field.name
    if type(struct_field.dataType) == T.StructType:
      collect = collect + parse_fields(struct_field.dataType, path + this_field_name + ".")
    else: 
      collect = collect + [path + this_field_name]
  return collect

parsed_fields = parse_fields(schema) # Find all leaf fields in the schema and return as '.' seperated path
print("Parsed fields:" + str(parsed_fields))
def get_column(col_name):
  for field in parsed_fields:
    if col_name in field:
      return F.col(field).alias(col_name)
    
name_col = "name"
description_col = "description"
partition_col = "partition_alt"

df_mytable = df_sample_data.select(get_column(name_col), get_column(description_col), get_column(partition_col))
df_mytable.show()

df_mytable.write.partitionBy(partition_col).format("parquet").save(path_for_saving)

输出:

root
 |-- metadata: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- time: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- description: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- Level1: struct (nullable = true)
 |    |-- Level2: struct (nullable = true)
 |    |    |-- Level3: struct (nullable = true)
 |    |    |    |-- partition_alt: integer (nullable = true)

+------------+--------------+---------+-------+
|    metadata|          data|partition| Level1|
+------------+--------------+---------+-------+
|{id1, time1}|{name1, desc1}|        1|{{{3}}}|
|{id2, time2}|{name2, desc2}|        2|{{{4}}}|
+------------+--------------+---------+-------+

Parsed fields:['metadata.id', 'metadata.time', 'data.name', 'data.description', 'partition', 'Level1.Level2.Level3.partition_alt']
+-----+-----------+-------------+
| name|description|partition_alt|
+-----+-----------+-------------+
|name1|      desc1|            3|
|name2|      desc2|            4|
+-----+-----------+-------------+

该示例演示了如何查找深度嵌套的字段。您需要使用自己的条件重写 get_column 以将字段名称与完整的列名称匹配。在这里,get_column 只是 returns 名称中包含 col_name 的第一个字段。