如何从 PySpark 中的数据框获取架构定义?

How to get the schema definition from a dataframe in PySpark?

在 PySpark 中,您可以定义模式并使用此预定义模式读取数据源,例如。 g.:

Schema = StructType([ StructField("temperature", DoubleType(), True),
                      StructField("temperature_unit", StringType(), True),
                      StructField("humidity", DoubleType(), True),
                      StructField("humidity_unit", StringType(), True),
                      StructField("pressure", DoubleType(), True),
                      StructField("pressure_unit", StringType(), True)
                    ])

对于某些数据源,可以从数据源推断模式并获得具有该模式定义的数据框。

是否可以从之前推断数据的数据框中获取模式定义(以上述形式)?

df.printSchema() 将模式打印为树,但我需要重用模式,如上定义,因此我可以使用之前从另一个数据推断出的模式读取数据源-来源。

是的,这是可能的。使用 DataFrame.schema property

schema

Returns the schema of this DataFrame as a pyspark.sql.types.StructType.

>>> df.schema
StructType(List(StructField(age,IntegerType,true),StructField(name,StringType,true)))

New in version 1.3.

架构 如果需要。

您可以为现有 Dataframe 重新使用架构

l = [('Ankita',25,'F'),('Jalfaizy',22,'M'),('saurabh',20,'M'),('Bala',26,None)]
people_rdd=spark.sparkContext.parallelize(l)
schemaPeople = people_rdd.toDF(['name','age','gender'])

schemaPeople.show()

+--------+---+------+
|    name|age|gender|
+--------+---+------+
|  Ankita| 25|     F|
|Jalfaizy| 22|     M|
| saurabh| 20|     M|
|    Bala| 26|  null|
+--------+---+------+

spark.createDataFrame(people_rdd,schemaPeople.schema).show()

+--------+---+------+
|    name|age|gender|
+--------+---+------+
|  Ankita| 25|     F|
|Jalfaizy| 22|     M|
| saurabh| 20|     M|
|    Bala| 26|  null|
+--------+---+------+

只需使用df.schema获取dataframe的底层模式

schemaPeople.schema

StructType(List(StructField(name,StringType,true),StructField(age,LongType,true),StructField(gender,StringType,true)))

下面的代码将为您提供已知数据框的格式良好的表格架构定义。当您有大量的列并且编辑很麻烦时,这非常有用。然后,您现在可以将它应用于您的新数据框并相应地手动编辑您可能想要的任何列。

from pyspark.sql.types import StructType

schema = [i for i in df.schema] 

然后从这里,你有你的新架构:

NewSchema = StructType(schema)

如果您要从 PySpark 中查找 DDL 字符串:

df: DataFrame = spark.read.load('LOCATION')
schema_json = df.schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()