将嵌套 Json 字符串列 Table 展平为表格格式

Flatten Nested Json String Column Table into tabular format

我目前正在尝试在数据块 table 中弄平数据。由于某些列嵌套很深并且属于 'String' 类型,因此我无法使用爆炸功能。

我当前的数据框如下所示:

显示(df)

account applied applylist aracct Internal Id
{"id":"1","name":"ABC","type":null} 2500.00 {"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0},{"total":25.00,"applyDate":"2021-07-15T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":25.0}],"replaceAll":false} {"internalId":"121","name":"CMS","type":null} 101
{"id":"2","name":"DEF","type":null} 1500.00 {"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0},{"total":35.00,"applyDate":"2021-09-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":350.0}],"replaceAll":false} {"internalId":"121","name":"BMS","type":null} 102

我的数据框架构如下所示:

df.printSchema()

如何在 table 之上展平并以表格格式而不是嵌套格式存储单个记录。

预期输出:

account.id account.name account.type applied applylist.apply.total applylist.apply.applydate applylist.apply.currency applylist.apply.apply applylist.apply.discamount applylist.apply.line applylist.apply.type applylist.apply.amount applylist.replaceAll
1 ABC null 2500.00 20.00 2021-07-13T07:00:00Z USA true null 0 Invoice 200.0 false
2 DEF null 1500.00 30.00 2021-08-13T07:00:00Z USA true null 0 Invoice 250.0 false

这是我的 Scala 代码:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = spark.sql("select * from ns_db_integration.transaction")

display(df.select($"applied" as "Applied", $"applylist", explode($"account"))
        .withColumn("Account.Id" ,$"col.id")
        .withColumn("Account.Name",$"col.name")
        .withColumn("Account.Type",$"col.type").drop($"col")
        .select($"*",$"applylist.*")
        .drop($"applylist")
        .select($"*",explode($"apply"))
        .drop($"apply")
        .withColumn("Total",$"col.total")
        .withColumn("ApplyDate",$"col.applyDate")
        .drop($"col")
       )

Error in Scala Code

还在 Pyspark 中尝试了 json_tuple 函数。哪个没有按我的预期工作。所有 applylist 列值变为空。

from pyspark.sql.functions import json_tuple,from_json,get_json_object, explode,col
    
    df.select(col("applied"),json_tuple(col("applylist"),"apply.total","apply.applyDate","apply.currency","apply.apply")) \
        .toDF("applied","applylist.apply.total","applylist.apply.applyDate","applylist.apply.currency","applylist.apply.apply") \
        .show(truncate=False)

Output of Pyspark Code

使用Pyspark,看下面的逻辑-

输入数据

str1 = """account   applied applylist   aracct
{"id":"1","name":"ABC","type":null} 2500.00 {"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0}],"replaceAll":false} {"internalId":"121","name":"CMS","type":null}
{"id":"2","name":"DEF","type":null} 1500.00 {"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0}],"replaceAll":false} {"internalId":"121","name":"BMS","type":null}"""

import pandas as pd
from io import StringIO

pdf = pd.read_csv(StringIO(str1), sep = '\t')
df = spark.createDataFrame(pdf)
df.show(truncate=False)

+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
|account                            |applied|applylist                                                                                                                                                              |aracct                                       |
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
|{"id":"1","name":"ABC","type":null}|2500.0 |{"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0}],"replaceAll":false}|{"internalId":"121","name":"CMS","type":null}|
|{"id":"2","name":"DEF","type":null}|1500.0 |{"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0}],"replaceAll":false}|{"internalId":"121","name":"BMS","type":null}|
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+

需要输出

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema_account = StructType([StructField("id", StringType(), True),
                             StructField("name", StringType(), True),
                             StructField("type", StringType(), True)
                            ])

df1 = (
  df.select(from_json(col("account"), schema_account).alias("account"),"applied",from_json(col("applylist"), MapType(StringType(), StringType())))
    .select("account.*","applied","entries.apply", "entries.replaceAll")
    .select("id", "name", "type", "applied" , from_json(col("apply"), ArrayType(MapType(StringType(), StringType()))).alias("apply"), "replaceAll")
    .select("id", "name", "type", "applied" , explode("apply").alias("apply"), "replaceAll")
    .select("id", "name", col("type").alias("type1"), "applied" , explode("apply"), "replaceAll")
    .groupBy("id", "name", "type1", "applied", "replaceAll").pivot("key").agg(first("value"))
    .withColumnRenamed("id", "account.id")
    .withColumnRenamed("name", "account.name")
    .withColumnRenamed("type1", "account.type")
    .withColumnRenamed("total", "applylist.apply.total")
    .withColumnRenamed("applyDate", "applylist.apply.applyDate")
    .withColumnRenamed("currency", "applylist.apply.currency")
    .withColumnRenamed("apply", "applylist.apply.apply")
    .withColumnRenamed("discAmt", "applylist.apply.discAmt")
    .withColumnRenamed("line", "applylist.apply.line")
    .withColumnRenamed("type", "applylist.apply.type")
    .withColumnRenamed("amount", "applylist.apply.amount")
)

df1.select("`account.id`" ,"`account.name`" ,"`account.type`" ,"applied" ,"`applylist.apply.total`" ,"`applylist.apply.applyDate`" ,"`applylist.apply.currency`" ,"`applylist.apply.apply`" ,"`applylist.apply.discAmt`" ,"`applylist.apply.line`" ,"`applylist.apply.type`" ,"`applylist.apply.amount`" ,"`replaceAll`").show(truncate=False)

+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
|account.id|account.name|account.type|applied|applylist.apply.total|applylist.apply.applyDate|applylist.apply.currency|applylist.apply.apply|applylist.apply.discAmt|applylist.apply.line|applylist.apply.type|applylist.apply.amount|replaceAll|
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
|1         |ABC         |null        |2500.0 |20.0                 |2021-07-13T07:00:00Z     |USA                     |true                 |null                   |0                   |Invoice             |200.0                 |false     |
|2         |DEF         |null        |1500.0 |30.0                 |2021-08-13T07:00:00Z     |USA                     |true                 |null                   |0                   |Invoice             |250.0                 |false     |
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+