如何自动将 JSON 转换为 Spark 模式?

How to convert JSON to Spark schema automatically?

我有一个很大的 JSON 想在 Spark Structured Streaming 中使用。我不想手动将此 JSON 重新键入为 Spark 架构表达式。我可以自动执行一次吗?


这是我写的

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Infer Schema") \
    .getOrCreate()

df = spark \
    .read \
    .option("multiline", True) \
    .json("file_examples/dataflow/row01.json")

df.printSchema()

df.show()

with open("dataflow_schema.json", "w") as fp:
    fp.write(df.schema.json())

这样可以吗?

你走在正确的道路上。您可以将您的模式保存为 json,然后稍后加载它。务必先将其转换为 json,然后再转换为 StructType,然后再使用

import json
from pyspark.sql.types import StructType

with open("dataflow_schema.json", "r") as fp:
    json_schema_str = fp.read()
    my_schema = StructType.fromJson(json.loads(json_schema_str))

在结构化流式查询中,如果您有一个 json 列,您可以使用 from_json 方法将您的 json 转换为 struct 类型并最终转换为多个列例如:

from pyspark.sql.functions import from_json,col

# Assume that we have a kafkaStream
kafkaStream.selectExpr("CAST(value as string)")\
           .select(from_json(col("value"),my_schema).alias("json_value"))\
           .selectExpr("json_value.*") # extract as columns