将嵌套 Json 字符串列 Table 展平为表格格式
Flatten Nested Json String Column Table into tabular format
我目前正在尝试在数据块 table 中弄平数据。由于某些列嵌套很深并且属于 'String' 类型,因此我无法使用爆炸功能。
我当前的数据框如下所示:
显示(df)
account
applied
applylist
aracct
Internal Id
{"id":"1","name":"ABC","type":null}
2500.00
{"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0},{"total":25.00,"applyDate":"2021-07-15T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":25.0}],"replaceAll":false}
{"internalId":"121","name":"CMS","type":null}
101
{"id":"2","name":"DEF","type":null}
1500.00
{"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0},{"total":35.00,"applyDate":"2021-09-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":350.0}],"replaceAll":false}
{"internalId":"121","name":"BMS","type":null}
102
我的数据框架构如下所示:
df.printSchema()
- |--账户:字符串(可为空=真)
- |--应用:decimal(38,6) (nullable = true)
- |-- 应用列表:字符串(可为空 = 真)
- |-- aracct: string (nullable = true)
如何在 table 之上展平并以表格格式而不是嵌套格式存储单个记录。
预期输出:
account.id
account.name
account.type
applied
applylist.apply.total
applylist.apply.applydate
applylist.apply.currency
applylist.apply.apply
applylist.apply.discamount
applylist.apply.line
applylist.apply.type
applylist.apply.amount
applylist.replaceAll
1
ABC
null
2500.00
20.00
2021-07-13T07:00:00Z
USA
true
null
0
Invoice
200.0
false
2
DEF
null
1500.00
30.00
2021-08-13T07:00:00Z
USA
true
null
0
Invoice
250.0
false
这是我的 Scala 代码:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = spark.sql("select * from ns_db_integration.transaction")
display(df.select($"applied" as "Applied", $"applylist", explode($"account"))
.withColumn("Account.Id" ,$"col.id")
.withColumn("Account.Name",$"col.name")
.withColumn("Account.Type",$"col.type").drop($"col")
.select($"*",$"applylist.*")
.drop($"applylist")
.select($"*",explode($"apply"))
.drop($"apply")
.withColumn("Total",$"col.total")
.withColumn("ApplyDate",$"col.applyDate")
.drop($"col")
)
Error in Scala Code
还在 Pyspark 中尝试了 json_tuple 函数。哪个没有按我的预期工作。所有 applylist 列值变为空。
from pyspark.sql.functions import json_tuple,from_json,get_json_object, explode,col
df.select(col("applied"),json_tuple(col("applylist"),"apply.total","apply.applyDate","apply.currency","apply.apply")) \
.toDF("applied","applylist.apply.total","applylist.apply.applyDate","applylist.apply.currency","applylist.apply.apply") \
.show(truncate=False)
Output of Pyspark Code
使用Pyspark,看下面的逻辑-
输入数据
str1 = """account applied applylist aracct
{"id":"1","name":"ABC","type":null} 2500.00 {"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0}],"replaceAll":false} {"internalId":"121","name":"CMS","type":null}
{"id":"2","name":"DEF","type":null} 1500.00 {"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0}],"replaceAll":false} {"internalId":"121","name":"BMS","type":null}"""
import pandas as pd
from io import StringIO
pdf = pd.read_csv(StringIO(str1), sep = '\t')
df = spark.createDataFrame(pdf)
df.show(truncate=False)
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
|account |applied|applylist |aracct |
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
|{"id":"1","name":"ABC","type":null}|2500.0 |{"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0}],"replaceAll":false}|{"internalId":"121","name":"CMS","type":null}|
|{"id":"2","name":"DEF","type":null}|1500.0 |{"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0}],"replaceAll":false}|{"internalId":"121","name":"BMS","type":null}|
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
需要输出
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema_account = StructType([StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("type", StringType(), True)
])
df1 = (
df.select(from_json(col("account"), schema_account).alias("account"),"applied",from_json(col("applylist"), MapType(StringType(), StringType())))
.select("account.*","applied","entries.apply", "entries.replaceAll")
.select("id", "name", "type", "applied" , from_json(col("apply"), ArrayType(MapType(StringType(), StringType()))).alias("apply"), "replaceAll")
.select("id", "name", "type", "applied" , explode("apply").alias("apply"), "replaceAll")
.select("id", "name", col("type").alias("type1"), "applied" , explode("apply"), "replaceAll")
.groupBy("id", "name", "type1", "applied", "replaceAll").pivot("key").agg(first("value"))
.withColumnRenamed("id", "account.id")
.withColumnRenamed("name", "account.name")
.withColumnRenamed("type1", "account.type")
.withColumnRenamed("total", "applylist.apply.total")
.withColumnRenamed("applyDate", "applylist.apply.applyDate")
.withColumnRenamed("currency", "applylist.apply.currency")
.withColumnRenamed("apply", "applylist.apply.apply")
.withColumnRenamed("discAmt", "applylist.apply.discAmt")
.withColumnRenamed("line", "applylist.apply.line")
.withColumnRenamed("type", "applylist.apply.type")
.withColumnRenamed("amount", "applylist.apply.amount")
)
df1.select("`account.id`" ,"`account.name`" ,"`account.type`" ,"applied" ,"`applylist.apply.total`" ,"`applylist.apply.applyDate`" ,"`applylist.apply.currency`" ,"`applylist.apply.apply`" ,"`applylist.apply.discAmt`" ,"`applylist.apply.line`" ,"`applylist.apply.type`" ,"`applylist.apply.amount`" ,"`replaceAll`").show(truncate=False)
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
|account.id|account.name|account.type|applied|applylist.apply.total|applylist.apply.applyDate|applylist.apply.currency|applylist.apply.apply|applylist.apply.discAmt|applylist.apply.line|applylist.apply.type|applylist.apply.amount|replaceAll|
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
|1 |ABC |null |2500.0 |20.0 |2021-07-13T07:00:00Z |USA |true |null |0 |Invoice |200.0 |false |
|2 |DEF |null |1500.0 |30.0 |2021-08-13T07:00:00Z |USA |true |null |0 |Invoice |250.0 |false |
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
我目前正在尝试在数据块 table 中弄平数据。由于某些列嵌套很深并且属于 'String' 类型,因此我无法使用爆炸功能。
我当前的数据框如下所示:
显示(df)
account | applied | applylist | aracct | Internal Id |
---|---|---|---|---|
{"id":"1","name":"ABC","type":null} | 2500.00 | {"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0},{"total":25.00,"applyDate":"2021-07-15T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":25.0}],"replaceAll":false} | {"internalId":"121","name":"CMS","type":null} | 101 |
{"id":"2","name":"DEF","type":null} | 1500.00 | {"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0},{"total":35.00,"applyDate":"2021-09-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":350.0}],"replaceAll":false} | {"internalId":"121","name":"BMS","type":null} | 102 |
我的数据框架构如下所示:
df.printSchema()
- |--账户:字符串(可为空=真)
- |--应用:decimal(38,6) (nullable = true)
- |-- 应用列表:字符串(可为空 = 真)
- |-- aracct: string (nullable = true)
如何在 table 之上展平并以表格格式而不是嵌套格式存储单个记录。
预期输出:
account.id | account.name | account.type | applied | applylist.apply.total | applylist.apply.applydate | applylist.apply.currency | applylist.apply.apply | applylist.apply.discamount | applylist.apply.line | applylist.apply.type | applylist.apply.amount | applylist.replaceAll |
---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | ABC | null | 2500.00 | 20.00 | 2021-07-13T07:00:00Z | USA | true | null | 0 | Invoice | 200.0 | false |
2 | DEF | null | 1500.00 | 30.00 | 2021-08-13T07:00:00Z | USA | true | null | 0 | Invoice | 250.0 | false |
这是我的 Scala 代码:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = spark.sql("select * from ns_db_integration.transaction")
display(df.select($"applied" as "Applied", $"applylist", explode($"account"))
.withColumn("Account.Id" ,$"col.id")
.withColumn("Account.Name",$"col.name")
.withColumn("Account.Type",$"col.type").drop($"col")
.select($"*",$"applylist.*")
.drop($"applylist")
.select($"*",explode($"apply"))
.drop($"apply")
.withColumn("Total",$"col.total")
.withColumn("ApplyDate",$"col.applyDate")
.drop($"col")
)
Error in Scala Code
还在 Pyspark 中尝试了 json_tuple 函数。哪个没有按我的预期工作。所有 applylist 列值变为空。
from pyspark.sql.functions import json_tuple,from_json,get_json_object, explode,col
df.select(col("applied"),json_tuple(col("applylist"),"apply.total","apply.applyDate","apply.currency","apply.apply")) \
.toDF("applied","applylist.apply.total","applylist.apply.applyDate","applylist.apply.currency","applylist.apply.apply") \
.show(truncate=False)
Output of Pyspark Code
使用Pyspark,看下面的逻辑-
输入数据
str1 = """account applied applylist aracct
{"id":"1","name":"ABC","type":null} 2500.00 {"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0}],"replaceAll":false} {"internalId":"121","name":"CMS","type":null}
{"id":"2","name":"DEF","type":null} 1500.00 {"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0}],"replaceAll":false} {"internalId":"121","name":"BMS","type":null}"""
import pandas as pd
from io import StringIO
pdf = pd.read_csv(StringIO(str1), sep = '\t')
df = spark.createDataFrame(pdf)
df.show(truncate=False)
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
|account |applied|applylist |aracct |
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
|{"id":"1","name":"ABC","type":null}|2500.0 |{"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0}],"replaceAll":false}|{"internalId":"121","name":"CMS","type":null}|
|{"id":"2","name":"DEF","type":null}|1500.0 |{"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0}],"replaceAll":false}|{"internalId":"121","name":"BMS","type":null}|
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
需要输出
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema_account = StructType([StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("type", StringType(), True)
])
df1 = (
df.select(from_json(col("account"), schema_account).alias("account"),"applied",from_json(col("applylist"), MapType(StringType(), StringType())))
.select("account.*","applied","entries.apply", "entries.replaceAll")
.select("id", "name", "type", "applied" , from_json(col("apply"), ArrayType(MapType(StringType(), StringType()))).alias("apply"), "replaceAll")
.select("id", "name", "type", "applied" , explode("apply").alias("apply"), "replaceAll")
.select("id", "name", col("type").alias("type1"), "applied" , explode("apply"), "replaceAll")
.groupBy("id", "name", "type1", "applied", "replaceAll").pivot("key").agg(first("value"))
.withColumnRenamed("id", "account.id")
.withColumnRenamed("name", "account.name")
.withColumnRenamed("type1", "account.type")
.withColumnRenamed("total", "applylist.apply.total")
.withColumnRenamed("applyDate", "applylist.apply.applyDate")
.withColumnRenamed("currency", "applylist.apply.currency")
.withColumnRenamed("apply", "applylist.apply.apply")
.withColumnRenamed("discAmt", "applylist.apply.discAmt")
.withColumnRenamed("line", "applylist.apply.line")
.withColumnRenamed("type", "applylist.apply.type")
.withColumnRenamed("amount", "applylist.apply.amount")
)
df1.select("`account.id`" ,"`account.name`" ,"`account.type`" ,"applied" ,"`applylist.apply.total`" ,"`applylist.apply.applyDate`" ,"`applylist.apply.currency`" ,"`applylist.apply.apply`" ,"`applylist.apply.discAmt`" ,"`applylist.apply.line`" ,"`applylist.apply.type`" ,"`applylist.apply.amount`" ,"`replaceAll`").show(truncate=False)
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
|account.id|account.name|account.type|applied|applylist.apply.total|applylist.apply.applyDate|applylist.apply.currency|applylist.apply.apply|applylist.apply.discAmt|applylist.apply.line|applylist.apply.type|applylist.apply.amount|replaceAll|
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
|1 |ABC |null |2500.0 |20.0 |2021-07-13T07:00:00Z |USA |true |null |0 |Invoice |200.0 |false |
|2 |DEF |null |1500.0 |30.0 |2021-08-13T07:00:00Z |USA |true |null |0 |Invoice |250.0 |false |
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+