Pyspark:从 JSON 文件创建模式
Pyspark: create a schema from JSON file
我正在处理来自非常长的嵌套 JSON 文件的数据。问题是,这些文件的结构并不总是相同,因为其中一些文件错过了其他文件的列。我想从包含所有列的空 JSON 文件创建自定义架构。如果我稍后将 JSON 文件读入此预定义模式,则不存在的列将填充空值(至少计划如此)。到目前为止我做了什么:
- 正在将测试JSON(不包含预期的所有列)加载到数据框中
- 将其架构写入 JSON 文件
- 在文本编辑器中打开此 JSON 文件并手动添加缺失的列
接下来我想做的是通过将 JSON 文件读入我的代码来创建一个新模式,但我在语法方面遇到了困难。我可以直接从文件本身读取架构吗?我试过了
schemaFromJson = StructType.fromJson(json.loads('filepath/spark-schema.json'))
但它给了我 TypeError: init() missing 2 required positional arguments: 'doc' and 'pos'
知道我当前的代码有什么问题吗?
非常感谢
编辑:
我遇到了这个 link sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield 。第 7 章几乎描述了我遇到的问题。我只是不明白如何解析我手动增强为 schemaFromJson = StructType.fromJson(json.loads(schema.json)).
的 json 文件
当我这样做时:
jsonDF = spark.read.json(filesToLoad)
schema = jsonDF.schema.json()
schemaNew = StructType.fromJson(json.loads(schema))
jsonDF2 = spark.read.schema(schemaNew).json(filesToLoad)
代码贯穿,但显然没有用,因为jsonDF和jsonDF2确实有相同的content/schema。我想要实现的是向 'schema' 添加一些列,然后这些列将反映在 'schemaNew'.
为什么不定义一个空的 DF,其中包含 JSON 文件可以包含的所有列?然后将 JSON 加载到其中。这是一个想法:
对于 Spark 3.1.0:
from pyspark.sql.types import *
schema = StructType([
StructField("fruit",StringType(),True),
StructField("size",StringType(),True),
StructField("color",StringType(),True)
])
df = spark.createDataFrame([], schema)
json_file_1 = {"fruit": "Apple","size": "Large"}
json_df_1 = spark.read.json(sc.parallelize([json_file_1]))
df = df.unionByName(json_df_1, allowMissingColumns=True)
json_file_2 = {"fruit": "Banana","size": "Small","color": "Yellow"}
df = df.unionByName(json_file_2, allowMissingColumns=True)
display(df)
我想我明白了。架构路径包含已经增强的架构:
schemapath = '/path/spark-schema.json'
with open(schemapath) as f:
d = json.load(f)
schemaNew = StructType.fromJson(d)
jsonDf2 = spark.read.schema(schmaNew).json(filesToLoad)
jsonDF2.printSchema()
我正在处理来自非常长的嵌套 JSON 文件的数据。问题是,这些文件的结构并不总是相同,因为其中一些文件错过了其他文件的列。我想从包含所有列的空 JSON 文件创建自定义架构。如果我稍后将 JSON 文件读入此预定义模式,则不存在的列将填充空值(至少计划如此)。到目前为止我做了什么:
- 正在将测试JSON(不包含预期的所有列)加载到数据框中
- 将其架构写入 JSON 文件
- 在文本编辑器中打开此 JSON 文件并手动添加缺失的列
接下来我想做的是通过将 JSON 文件读入我的代码来创建一个新模式,但我在语法方面遇到了困难。我可以直接从文件本身读取架构吗?我试过了
schemaFromJson = StructType.fromJson(json.loads('filepath/spark-schema.json'))
但它给了我 TypeError: init() missing 2 required positional arguments: 'doc' and 'pos'
知道我当前的代码有什么问题吗? 非常感谢
编辑: 我遇到了这个 link sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield 。第 7 章几乎描述了我遇到的问题。我只是不明白如何解析我手动增强为 schemaFromJson = StructType.fromJson(json.loads(schema.json)).
的 json 文件当我这样做时:
jsonDF = spark.read.json(filesToLoad)
schema = jsonDF.schema.json()
schemaNew = StructType.fromJson(json.loads(schema))
jsonDF2 = spark.read.schema(schemaNew).json(filesToLoad)
代码贯穿,但显然没有用,因为jsonDF和jsonDF2确实有相同的content/schema。我想要实现的是向 'schema' 添加一些列,然后这些列将反映在 'schemaNew'.
为什么不定义一个空的 DF,其中包含 JSON 文件可以包含的所有列?然后将 JSON 加载到其中。这是一个想法:
对于 Spark 3.1.0:
from pyspark.sql.types import *
schema = StructType([
StructField("fruit",StringType(),True),
StructField("size",StringType(),True),
StructField("color",StringType(),True)
])
df = spark.createDataFrame([], schema)
json_file_1 = {"fruit": "Apple","size": "Large"}
json_df_1 = spark.read.json(sc.parallelize([json_file_1]))
df = df.unionByName(json_df_1, allowMissingColumns=True)
json_file_2 = {"fruit": "Banana","size": "Small","color": "Yellow"}
df = df.unionByName(json_file_2, allowMissingColumns=True)
display(df)
我想我明白了。架构路径包含已经增强的架构:
schemapath = '/path/spark-schema.json'
with open(schemapath) as f:
d = json.load(f)
schemaNew = StructType.fromJson(d)
jsonDf2 = spark.read.schema(schmaNew).json(filesToLoad)
jsonDF2.printSchema()