在 Pyspark SQL 中爆炸 JSON
Explode JSON in PysparkSQL
我希望将嵌套的 json 分解为 CSV 文件。
希望将嵌套的 json 解析为行和列。
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Row
df=spark.read.option("multiline","true").json("sample1.json")
df.printSchema()
root
|-- pid: struct (nullable = true)
| |-- Body: struct (nullable = true)
| | |-- Vendor: struct (nullable = true)
| | | |-- RC: struct (nullable = true)
| | | | |-- Updated_From_Date: string (nullable = true)
| | | | |-- Updated_To_Date: string (nullable = true)
| | | |-- RD: struct (nullable = true)
| | | | |-- Supplier: struct (nullable = true)
| | | | | |-- Supplier_Data: struct (nullable = true)
| | | | | | |-- Days: long (nullable = true)
| | | | | | |-- Reference: struct (nullable = true)
| | | | | | | |-- ID: array (nullable = true)
| | | | | | | | |-- element: string (containsNull = true)
| | | | | | |-- Expected: long (nullable = true)
| | | | | | |-- Payments: long (nullable = true)
| | | | | | |-- Approval: struct (nullable = true)
| | | | | | | |-- ID: array (nullable = true)
| | | | | | | | |-- element: string (containsNull = true)
| | | | | | |-- Areas_Changed: struct (nullable = true)
| | | | | | | |-- Alternate_Names: long (nullable = true)
| | | | | | | |-- Attachments: long (nullable = true)
| | | | | | | |-- Classifications: long (nullable = true)
| | | | | | | |-- Contact_Information: long (nullable = true)
我的代码:
df2=(df.select(F.explode("pid").alias('pid'))
.select('pid.*')
.select(F.explode('Body').alias('Body'))
.select('Body.*')
.select((F.explode('Vendor').alias('Vendor'))
.select('Vendor.*')
.select((F.explode('RC').alias('RC'))
.select('RC.*'))))
错误:
AnalysisException:由于数据类型不匹配,无法解析 'explode(pid)':函数 explode 的输入应为数组或映射类型,而不是 struct
如何解析为结构字段。
任何帮助将不胜感激:)
您只能在地图或数组类型上使用 explode
函数。要访问 strcut 类型,只需使用 .
运算符。
假设您想获取 RC 和 RD 下的列,那么代码语法应如下所示。
df.select("pid.Body.Vendor.RC.*", "pid.Body.Vendor.RD.*")
我希望将嵌套的 json 分解为 CSV 文件。 希望将嵌套的 json 解析为行和列。
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Row
df=spark.read.option("multiline","true").json("sample1.json")
df.printSchema()
root
|-- pid: struct (nullable = true)
| |-- Body: struct (nullable = true)
| | |-- Vendor: struct (nullable = true)
| | | |-- RC: struct (nullable = true)
| | | | |-- Updated_From_Date: string (nullable = true)
| | | | |-- Updated_To_Date: string (nullable = true)
| | | |-- RD: struct (nullable = true)
| | | | |-- Supplier: struct (nullable = true)
| | | | | |-- Supplier_Data: struct (nullable = true)
| | | | | | |-- Days: long (nullable = true)
| | | | | | |-- Reference: struct (nullable = true)
| | | | | | | |-- ID: array (nullable = true)
| | | | | | | | |-- element: string (containsNull = true)
| | | | | | |-- Expected: long (nullable = true)
| | | | | | |-- Payments: long (nullable = true)
| | | | | | |-- Approval: struct (nullable = true)
| | | | | | | |-- ID: array (nullable = true)
| | | | | | | | |-- element: string (containsNull = true)
| | | | | | |-- Areas_Changed: struct (nullable = true)
| | | | | | | |-- Alternate_Names: long (nullable = true)
| | | | | | | |-- Attachments: long (nullable = true)
| | | | | | | |-- Classifications: long (nullable = true)
| | | | | | | |-- Contact_Information: long (nullable = true)
我的代码:
df2=(df.select(F.explode("pid").alias('pid'))
.select('pid.*')
.select(F.explode('Body').alias('Body'))
.select('Body.*')
.select((F.explode('Vendor').alias('Vendor'))
.select('Vendor.*')
.select((F.explode('RC').alias('RC'))
.select('RC.*'))))
错误:
AnalysisException:由于数据类型不匹配,无法解析 'explode(pid)':函数 explode 的输入应为数组或映射类型,而不是 struct
如何解析为结构字段。 任何帮助将不胜感激:)
您只能在地图或数组类型上使用 explode
函数。要访问 strcut 类型,只需使用 .
运算符。
假设您想获取 RC 和 RD 下的列,那么代码语法应如下所示。
df.select("pid.Body.Vendor.RC.*", "pid.Body.Vendor.RD.*")