从 struct Pyspark 数组创建列

Create column from array of struct Pyspark

我对数据处理还很陌生。 我有一个深度嵌套的数据集,大致有这个模式:

 |-- col1 : string
 |-- col2 : string
 |-- col3: struct
 |    |-- something : string
 |    |-- elem: array
 |    |    |-- registrationNumber: struct
 |    |    |     |-- registrationNumber : string
 |    |    |     |-- registrationNumberType : string
 |    |    |     |-- registrationCode : int

对于阵列,我会收到这样的东西。请记住,长度是可变的,我可能没有收到任何值或 10 甚至更多

[
  {
    registrationNumber : 123456789
    registrationNumberType : VAT
    registrationCode : 1234
  },
  {
    registrationNumber : ABCDERTYU
    registrationNumberType : fiscal1
    registrationCode : 9876
  },
  {
    registrationNumber : 123456789
    registrationNumberType : foo
    registrationCode : 8765
  }
]

有没有办法将架构转换为:

 |-- col1 : string
 |-- col2 : string
 |-- col3: struct
 |    |-- something : string
 |    |-- VAT: string
 |    |-- fiscal1: string 

其中 VATfiscal1 值是 registrationNumber 值。 我基本上需要得到一个包含 VATfiscal1 值的列作为列

非常感谢

编辑:

这是 col3

的示例 json
{
        "col3": {
            "somestring": "xxxxxx",
            "registrationNumbers": [
              {
                'registrationNumber' : 'something',
                'registrationNumberType' : 'VAT'
              },
              {
                'registrationNumber' : 'somethingelse',
                'registrationNumberType' : 'fiscal1'
              },
              {
                'registrationNumber' : 'something i dont need',
                'registrationNumberType' : 'fiscal2'
              }
            ]
        }
}

这是我想要的:

{
        "col3": {
            "somestring": "xxxxxx",
            "VAT" : "something"
            "fiscal1" : "somethingelse"
        }
}

也许我可以,使用数组和主键创建数据框,创建 VATfiscal1 列以及来自新数据框的 select 数据以输入到列中? 最后使用主键

加入 2 个数据帧

您可以使用 inline 函数分解和扩展 col3.registrationNumbers 数组的结构元素,然后仅过滤 registrationNumberType 的行 VATfiscal1 和枢轴。转置后,使用转置后的列更新结构列 col3

import pyspark.sql.functions as F

exampleJSON = '{"col1":"col1_XX","col2":"col2_XX","col3":{"somestring":"xxxxxx","registrationNumbers":[{"registrationNumber":"something","registrationNumberType":"VAT"},{"registrationNumber":"somethingelse","registrationNumberType":"fiscal1"},{"registrationNumber":"something i dont need","registrationNumberType":"fiscal2"}]}}'
df = spark.read.json(sc.parallelize([exampleJSON]))

df1 = df.selectExpr("*", "inline(col3.registrationNumbers)") \
    .filter(F.col("registrationNumberType").isin(["VAT", "fiscal1"])) \
    .groupBy("col1", "col2", "col3") \
    .pivot("registrationNumberType") \
    .agg(F.first("registrationNumber")) \
    .withColumn("col3", F.struct(F.col("col3.somestring"), F.col("VAT"), F.col("fiscal1"))) \
    .drop("VAT", "fiscal1")

df1.printSchema()
#root
# |-- col1: string (nullable = true)
# |-- col2: string (nullable = true)
# |-- col3: struct (nullable = false)
# |    |-- somestring: string (nullable = true)
# |    |-- VAT: string (nullable = true)
# |    |-- fiscal1: string (nullable = true)

df1.show(truncate=False)
#+-------+-------+----------------------------------+
#|col1   |col2   |col3                              |
#+-------+-------+----------------------------------+
#|col1_XX|col2_XX|{xxxxxx, something, somethingelse}|
#+-------+-------+----------------------------------+