从 struct Pyspark 数组创建列
Create column from array of struct Pyspark
我对数据处理还很陌生。
我有一个深度嵌套的数据集,大致有这个模式:
|-- col1 : string
|-- col2 : string
|-- col3: struct
| |-- something : string
| |-- elem: array
| | |-- registrationNumber: struct
| | | |-- registrationNumber : string
| | | |-- registrationNumberType : string
| | | |-- registrationCode : int
对于阵列,我会收到这样的东西。请记住,长度是可变的,我可能没有收到任何值或 10 甚至更多
[
{
registrationNumber : 123456789
registrationNumberType : VAT
registrationCode : 1234
},
{
registrationNumber : ABCDERTYU
registrationNumberType : fiscal1
registrationCode : 9876
},
{
registrationNumber : 123456789
registrationNumberType : foo
registrationCode : 8765
}
]
有没有办法将架构转换为:
|-- col1 : string
|-- col2 : string
|-- col3: struct
| |-- something : string
| |-- VAT: string
| |-- fiscal1: string
其中 VAT
和 fiscal1
值是 registrationNumber
值。
我基本上需要得到一个包含 VAT
和 fiscal1
值的列作为列
非常感谢
编辑:
这是 col3
的示例 json
{
"col3": {
"somestring": "xxxxxx",
"registrationNumbers": [
{
'registrationNumber' : 'something',
'registrationNumberType' : 'VAT'
},
{
'registrationNumber' : 'somethingelse',
'registrationNumberType' : 'fiscal1'
},
{
'registrationNumber' : 'something i dont need',
'registrationNumberType' : 'fiscal2'
}
]
}
}
这是我想要的:
{
"col3": {
"somestring": "xxxxxx",
"VAT" : "something"
"fiscal1" : "somethingelse"
}
}
也许我可以,使用数组和主键创建数据框,创建 VAT
和 fiscal1
列以及来自新数据框的 select 数据以输入到列中?
最后使用主键
加入 2 个数据帧
您可以使用 inline
函数分解和扩展 col3.registrationNumbers
数组的结构元素,然后仅过滤 registrationNumberType
的行 VAT
或 fiscal1
和枢轴。转置后,使用转置后的列更新结构列 col3
:
import pyspark.sql.functions as F
exampleJSON = '{"col1":"col1_XX","col2":"col2_XX","col3":{"somestring":"xxxxxx","registrationNumbers":[{"registrationNumber":"something","registrationNumberType":"VAT"},{"registrationNumber":"somethingelse","registrationNumberType":"fiscal1"},{"registrationNumber":"something i dont need","registrationNumberType":"fiscal2"}]}}'
df = spark.read.json(sc.parallelize([exampleJSON]))
df1 = df.selectExpr("*", "inline(col3.registrationNumbers)") \
.filter(F.col("registrationNumberType").isin(["VAT", "fiscal1"])) \
.groupBy("col1", "col2", "col3") \
.pivot("registrationNumberType") \
.agg(F.first("registrationNumber")) \
.withColumn("col3", F.struct(F.col("col3.somestring"), F.col("VAT"), F.col("fiscal1"))) \
.drop("VAT", "fiscal1")
df1.printSchema()
#root
# |-- col1: string (nullable = true)
# |-- col2: string (nullable = true)
# |-- col3: struct (nullable = false)
# | |-- somestring: string (nullable = true)
# | |-- VAT: string (nullable = true)
# | |-- fiscal1: string (nullable = true)
df1.show(truncate=False)
#+-------+-------+----------------------------------+
#|col1 |col2 |col3 |
#+-------+-------+----------------------------------+
#|col1_XX|col2_XX|{xxxxxx, something, somethingelse}|
#+-------+-------+----------------------------------+
我对数据处理还很陌生。 我有一个深度嵌套的数据集,大致有这个模式:
|-- col1 : string
|-- col2 : string
|-- col3: struct
| |-- something : string
| |-- elem: array
| | |-- registrationNumber: struct
| | | |-- registrationNumber : string
| | | |-- registrationNumberType : string
| | | |-- registrationCode : int
对于阵列,我会收到这样的东西。请记住,长度是可变的,我可能没有收到任何值或 10 甚至更多
[
{
registrationNumber : 123456789
registrationNumberType : VAT
registrationCode : 1234
},
{
registrationNumber : ABCDERTYU
registrationNumberType : fiscal1
registrationCode : 9876
},
{
registrationNumber : 123456789
registrationNumberType : foo
registrationCode : 8765
}
]
有没有办法将架构转换为:
|-- col1 : string
|-- col2 : string
|-- col3: struct
| |-- something : string
| |-- VAT: string
| |-- fiscal1: string
其中 VAT
和 fiscal1
值是 registrationNumber
值。
我基本上需要得到一个包含 VAT
和 fiscal1
值的列作为列
非常感谢
编辑:
这是 col3
{
"col3": {
"somestring": "xxxxxx",
"registrationNumbers": [
{
'registrationNumber' : 'something',
'registrationNumberType' : 'VAT'
},
{
'registrationNumber' : 'somethingelse',
'registrationNumberType' : 'fiscal1'
},
{
'registrationNumber' : 'something i dont need',
'registrationNumberType' : 'fiscal2'
}
]
}
}
这是我想要的:
{
"col3": {
"somestring": "xxxxxx",
"VAT" : "something"
"fiscal1" : "somethingelse"
}
}
也许我可以,使用数组和主键创建数据框,创建 VAT
和 fiscal1
列以及来自新数据框的 select 数据以输入到列中?
最后使用主键
您可以使用 inline
函数分解和扩展 col3.registrationNumbers
数组的结构元素,然后仅过滤 registrationNumberType
的行 VAT
或 fiscal1
和枢轴。转置后,使用转置后的列更新结构列 col3
:
import pyspark.sql.functions as F
exampleJSON = '{"col1":"col1_XX","col2":"col2_XX","col3":{"somestring":"xxxxxx","registrationNumbers":[{"registrationNumber":"something","registrationNumberType":"VAT"},{"registrationNumber":"somethingelse","registrationNumberType":"fiscal1"},{"registrationNumber":"something i dont need","registrationNumberType":"fiscal2"}]}}'
df = spark.read.json(sc.parallelize([exampleJSON]))
df1 = df.selectExpr("*", "inline(col3.registrationNumbers)") \
.filter(F.col("registrationNumberType").isin(["VAT", "fiscal1"])) \
.groupBy("col1", "col2", "col3") \
.pivot("registrationNumberType") \
.agg(F.first("registrationNumber")) \
.withColumn("col3", F.struct(F.col("col3.somestring"), F.col("VAT"), F.col("fiscal1"))) \
.drop("VAT", "fiscal1")
df1.printSchema()
#root
# |-- col1: string (nullable = true)
# |-- col2: string (nullable = true)
# |-- col3: struct (nullable = false)
# | |-- somestring: string (nullable = true)
# | |-- VAT: string (nullable = true)
# | |-- fiscal1: string (nullable = true)
df1.show(truncate=False)
#+-------+-------+----------------------------------+
#|col1 |col2 |col3 |
#+-------+-------+----------------------------------+
#|col1_XX|col2_XX|{xxxxxx, something, somethingelse}|
#+-------+-------+----------------------------------+