使用 json 文件中包含的架构用于 Python 中的 spark.read()
Using schema contained in a json file for spark.read() in Python
问题:
- 我的模式转换为 json 是否正确?
- 如何传递 json 文件以为 spark.read()
提供架构
我将以下模式硬编码到 python 脚本中,这非常适合我的代码:
schema1 = StructType([
StructField("computer_name", StringType()),
StructField("owner_id", StringType())])
而且我想将架构移出 json 文件中的脚本,因此我进行了以下转换:
{"StructType":[
{"fields":[
{"metadata":{},"name":"computer_name","nullable":true,"type":"string"},
{"metadata":{},"name":"owner_id","nullable":true,"type":"string"},
"type":"struct"}
]
}
我想读取文件并使用 json 文件提供我自己的架构:
df=spark.read.option("header", "true").schema(schema_json_file).load(file_names)
这将引发以下错误:
ERROR: schema should be StructType or string
我是这样解决问题的:
json 文件:
{"fields":[
{"metadata":{},"name":"computer_name","nullable":true,"type":"string"},
{"metadata":{},"name":"owner_id","nullable":true,"type":"string"}],
"type":"struct"}
Python代码:
schema_file = s3.Object("bucket_name", "prefix")
schema_file = json.loads(schema_file.get()['Body'].read().decode('utf-8'))
custom_schema = StructType.fromJson(schema_file)
df=spark.read.option("header", "true").schema(custom_schema).load(file_names)
在我的例子中,具有架构的 json 文件位于 S3 位置。
问题:
- 我的模式转换为 json 是否正确?
- 如何传递 json 文件以为 spark.read() 提供架构
我将以下模式硬编码到 python 脚本中,这非常适合我的代码:
schema1 = StructType([
StructField("computer_name", StringType()),
StructField("owner_id", StringType())])
而且我想将架构移出 json 文件中的脚本,因此我进行了以下转换:
{"StructType":[
{"fields":[
{"metadata":{},"name":"computer_name","nullable":true,"type":"string"},
{"metadata":{},"name":"owner_id","nullable":true,"type":"string"},
"type":"struct"}
]
}
我想读取文件并使用 json 文件提供我自己的架构:
df=spark.read.option("header", "true").schema(schema_json_file).load(file_names)
这将引发以下错误:
ERROR: schema should be StructType or string
我是这样解决问题的:
json 文件:
{"fields":[
{"metadata":{},"name":"computer_name","nullable":true,"type":"string"},
{"metadata":{},"name":"owner_id","nullable":true,"type":"string"}],
"type":"struct"}
Python代码:
schema_file = s3.Object("bucket_name", "prefix")
schema_file = json.loads(schema_file.get()['Body'].read().decode('utf-8'))
custom_schema = StructType.fromJson(schema_file)
df=spark.read.option("header", "true").schema(custom_schema).load(file_names)
在我的例子中,具有架构的 json 文件位于 S3 位置。