如何自动将 JSON 转换为 Spark 模式?
How to convert JSON to Spark schema automatically?
我有一个很大的 JSON 想在 Spark Structured Streaming 中使用。我不想手动将此 JSON 重新键入为 Spark 架构表达式。我可以自动执行一次吗?
这是我写的
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Infer Schema") \
.getOrCreate()
df = spark \
.read \
.option("multiline", True) \
.json("file_examples/dataflow/row01.json")
df.printSchema()
df.show()
with open("dataflow_schema.json", "w") as fp:
fp.write(df.schema.json())
这样可以吗?
你走在正确的道路上。您可以将您的模式保存为 json,然后稍后加载它。务必先将其转换为 json
,然后再转换为 StructType
,然后再使用
import json
from pyspark.sql.types import StructType
with open("dataflow_schema.json", "r") as fp:
json_schema_str = fp.read()
my_schema = StructType.fromJson(json.loads(json_schema_str))
在结构化流式查询中,如果您有一个 json
列,您可以使用 from_json
方法将您的 json 转换为 struct
类型并最终转换为多个列例如:
from pyspark.sql.functions import from_json,col
# Assume that we have a kafkaStream
kafkaStream.selectExpr("CAST(value as string)")\
.select(from_json(col("value"),my_schema).alias("json_value"))\
.selectExpr("json_value.*") # extract as columns
我有一个很大的 JSON 想在 Spark Structured Streaming 中使用。我不想手动将此 JSON 重新键入为 Spark 架构表达式。我可以自动执行一次吗?
这是我写的
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Infer Schema") \
.getOrCreate()
df = spark \
.read \
.option("multiline", True) \
.json("file_examples/dataflow/row01.json")
df.printSchema()
df.show()
with open("dataflow_schema.json", "w") as fp:
fp.write(df.schema.json())
这样可以吗?
你走在正确的道路上。您可以将您的模式保存为 json,然后稍后加载它。务必先将其转换为 json
,然后再转换为 StructType
,然后再使用
import json
from pyspark.sql.types import StructType
with open("dataflow_schema.json", "r") as fp:
json_schema_str = fp.read()
my_schema = StructType.fromJson(json.loads(json_schema_str))
在结构化流式查询中,如果您有一个 json
列,您可以使用 from_json
方法将您的 json 转换为 struct
类型并最终转换为多个列例如:
from pyspark.sql.functions import from_json,col
# Assume that we have a kafkaStream
kafkaStream.selectExpr("CAST(value as string)")\
.select(from_json(col("value"),my_schema).alias("json_value"))\
.selectExpr("json_value.*") # extract as columns