从 Parquet 提取结构数组到多值 csv Spark Scala
Extract Array of Struct from Parquet into multi-value csv Spark Scala
我正在尝试使用 Spark Scala 从镶木地板中提取一个结构数组。输入是镶木地板文件。输出是一个 csv 文件。 csv 的字段可以具有由“#;”分隔的“多值”。 csv 由“,”分隔。实现此目标的最佳方法是什么?
架构
root
|-- llamaEvent: struct (nullable = true)
| |-- activity: struct (nullable = true)
| | |-- Animal: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- time: string (nullable = true)
| | | | |-- status: string (nullable = true)
| | | | |-- llamaType: string (nullable = true)
示例输入为 json(输入将为镶木地板)
{
"llamaEvent":{
"activity":{
"Animal":[
{
"time":"5-1-2020",
"status":"Running",
"llamaType":"red llama"
},
{
"time":"6-2-2020",
"status":"Sitting",
"llamaType":"blue llama"
}
]
}
}
}
所需的 CSV 输出
time,status,llamaType
5-1-2020#;6-2-2020,running#;sitting,red llama#;blue llama
更新:
基于一些试验和错误,我相信根据用例,这样的解决方案可能是合适的。这通过抓取数组项,将其转换为字符串,然后解析出无关字符来实现“捷径”,这对某些用例很有用。
df.select(col("llamaEvent.activity").getItem("Animal").getItem("time")).cast("String"))
然后你就可以在regexp_replace
之后执行你想要的任何解析
df.withColumn("time", regexp_replace(col("time"),",",";#"))
还提出了几个适当的解决方案,同时使用 groupby、explode、aggregate。
这将是一个适合您的解决方案
df = spark.createDataFrame([(str([a_json]))],T.StringType())
df = df.withColumn('col', F.from_json("value", T.ArrayType(T.StringType())))
df = df.withColumn("col", F.explode("col"))
df = df.withColumn("col", F.from_json("col", T.MapType(T.StringType(), T.StringType())))
df = df.withColumn("llamaEvent", df.col.getItem("llamaEvent"))
df = df.withColumn("llamaEvent", F.from_json("llamaEvent", T.MapType(T.StringType(), T.StringType())))
df = df.select("*", F.explode("llamaEvent").alias("x","y"))
df = df.withColumn("Activity", F.from_json("y", T.MapType(T.StringType(), T.StringType())))
df = df.select("*", F.explode("Activity").alias("x","yy"))
df = df.withColumn("final_col", F.from_json("yy", T.ArrayType(T.StringType())))
df = df.withColumn("final_col", F.explode("final_col"))
df = df.withColumn("final_col", F.from_json("final_col", T.MapType(T.StringType(), T.StringType())))
df = df.withColumn("time", df.final_col.getItem("time")).withColumn("status", df.final_col.getItem("status")).withColumn("llamaType", df.final_col.getItem("llamaType")).withColumn("agg_col", F.lit("1"))
df_grp = df.groupby("agg_col").agg(F.concat_ws("#;", F.collect_list(df.time)).alias("time"), F.concat_ws("#;", F.collect_list(df.status)).alias("status"), F.concat_ws("#;", F.collect_list(df.llamaType)).alias("llamaType"))
display(df)
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
| value| col| llamaEvent| x| y| Activity| x| yy| final_col| time| status| llamaType|agg_col|
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
|[{'llamaEvent': {...|[llamaEvent -> {"...|[activity -> {"An...|activity|{"Animal":[{"time...|[Animal -> [{"tim...|Animal|[{"time":"5-1-202...|[time -> 5-1-2020...|5-1-2020|Running| red llama| 1|
|[{'llamaEvent': {...|[llamaEvent -> {"...|[activity -> {"An...|activity|{"Animal":[{"time...|[Animal -> [{"tim...|Animal|[{"time":"5-1-202...|[time -> 6-2-2020...|6-2-2020|Sitting|blue llama| 1|
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
df_grp.show(truncate=False)
+-------+------------------+----------------+---------------------+
|agg_col|time |status |llamaType |
+-------+------------------+----------------+---------------------+
|1 |5-1-2020#;6-2-2020|Running#;Sitting|red llama#;blue llama|
+-------+------------------+----------------+---------------------+
一种方法是使用 SQL 函数 inline
展平动物属性数组 struct
并通过 collect_list
聚合属性,然后与特定分隔符。
给定一个与您提供的模式相似的 DataFrame df
,以下转换将生成所需的数据集,dfResult
:
val attribCSVs = List("time", "status", "llamaType").map(
c => concat_ws("#;", collect_list(c)).as(c)
)
val dfResult = df.
select($"eventId", expr("inline(llamaEvent.activity.Animal)")).
groupBy("eventId").agg(attribCSVs.head, attribCSVs.tail: _*)
请注意,已将事件标识列 eventId
添加到样本 json 数据中以进行必要的 groupBy
聚合。
让我们assemble一些示例数据:
val jsons = Seq(
"""{
"eventId": 1,
"llamaEvent":{
"activity":{
"Animal":[
{
"time":"5-1-2020",
"status":"Running",
"llamaType":"red llama"
},
{
"time":"6-2-2020",
"status":"Sitting",
"llamaType":"blue llama"
}
]
}
}
}""",
"""{
"eventId": 2,
"llamaEvent":{
"activity":{
"Animal":[
{
"time":"5-2-2020",
"status":"Running",
"llamaType":"red llama"
},
{
"time":"6-3-2020",
"status":"Standing",
"llamaType":"blue llama"
}
]
}
}
}"""
)
val df = spark.read.option("multiLine", true).json(jsons.toDS)
df.show(false)
+-------+----------------------------------------------------------------------+
|eventId|llamaEvent |
+-------+----------------------------------------------------------------------+
|1 |{{[{red llama, Running, 5-1-2020}, {blue llama, Sitting, 6-2-2020}]}} |
|2 |{{[{red llama, Running, 5-2-2020}, {blue llama, Standing, 6-3-2020}]}}|
+-------+----------------------------------------------------------------------+
应用上述转换,dfResult
应如下所示:
dfResult.show(false)
+-------+------------------+-----------------+---------------------+
|eventId|time |status |llamaType |
+-------+------------------+-----------------+---------------------+
|1 |5-1-2020#;6-2-2020|Running#;Sitting |red llama#;blue llama|
|2 |5-2-2020#;6-3-2020|Running#;Standing|red llama#;blue llama|
+-------+------------------+-----------------+---------------------+
正在将 dfResult
写入 CSV 文件:
dfResult.write.option("header", true).csv("/path/to/csv")
/*
eventId,time,status,llamaType
1,5-1-2020#;6-2-2020,Running#;Sitting,red llama#;blue llama
2,5-2-2020#;6-3-2020,Running#;Standing,red llama#;blue llama
*/
我正在尝试使用 Spark Scala 从镶木地板中提取一个结构数组。输入是镶木地板文件。输出是一个 csv 文件。 csv 的字段可以具有由“#;”分隔的“多值”。 csv 由“,”分隔。实现此目标的最佳方法是什么?
架构
root
|-- llamaEvent: struct (nullable = true)
| |-- activity: struct (nullable = true)
| | |-- Animal: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- time: string (nullable = true)
| | | | |-- status: string (nullable = true)
| | | | |-- llamaType: string (nullable = true)
示例输入为 json(输入将为镶木地板)
{
"llamaEvent":{
"activity":{
"Animal":[
{
"time":"5-1-2020",
"status":"Running",
"llamaType":"red llama"
},
{
"time":"6-2-2020",
"status":"Sitting",
"llamaType":"blue llama"
}
]
}
}
}
所需的 CSV 输出
time,status,llamaType
5-1-2020#;6-2-2020,running#;sitting,red llama#;blue llama
更新: 基于一些试验和错误,我相信根据用例,这样的解决方案可能是合适的。这通过抓取数组项,将其转换为字符串,然后解析出无关字符来实现“捷径”,这对某些用例很有用。
df.select(col("llamaEvent.activity").getItem("Animal").getItem("time")).cast("String"))
然后你就可以在regexp_replace
之后执行你想要的任何解析df.withColumn("time", regexp_replace(col("time"),",",";#"))
还提出了几个适当的解决方案,同时使用 groupby、explode、aggregate。
这将是一个适合您的解决方案
df = spark.createDataFrame([(str([a_json]))],T.StringType())
df = df.withColumn('col', F.from_json("value", T.ArrayType(T.StringType())))
df = df.withColumn("col", F.explode("col"))
df = df.withColumn("col", F.from_json("col", T.MapType(T.StringType(), T.StringType())))
df = df.withColumn("llamaEvent", df.col.getItem("llamaEvent"))
df = df.withColumn("llamaEvent", F.from_json("llamaEvent", T.MapType(T.StringType(), T.StringType())))
df = df.select("*", F.explode("llamaEvent").alias("x","y"))
df = df.withColumn("Activity", F.from_json("y", T.MapType(T.StringType(), T.StringType())))
df = df.select("*", F.explode("Activity").alias("x","yy"))
df = df.withColumn("final_col", F.from_json("yy", T.ArrayType(T.StringType())))
df = df.withColumn("final_col", F.explode("final_col"))
df = df.withColumn("final_col", F.from_json("final_col", T.MapType(T.StringType(), T.StringType())))
df = df.withColumn("time", df.final_col.getItem("time")).withColumn("status", df.final_col.getItem("status")).withColumn("llamaType", df.final_col.getItem("llamaType")).withColumn("agg_col", F.lit("1"))
df_grp = df.groupby("agg_col").agg(F.concat_ws("#;", F.collect_list(df.time)).alias("time"), F.concat_ws("#;", F.collect_list(df.status)).alias("status"), F.concat_ws("#;", F.collect_list(df.llamaType)).alias("llamaType"))
display(df)
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
| value| col| llamaEvent| x| y| Activity| x| yy| final_col| time| status| llamaType|agg_col|
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
|[{'llamaEvent': {...|[llamaEvent -> {"...|[activity -> {"An...|activity|{"Animal":[{"time...|[Animal -> [{"tim...|Animal|[{"time":"5-1-202...|[time -> 5-1-2020...|5-1-2020|Running| red llama| 1|
|[{'llamaEvent': {...|[llamaEvent -> {"...|[activity -> {"An...|activity|{"Animal":[{"time...|[Animal -> [{"tim...|Animal|[{"time":"5-1-202...|[time -> 6-2-2020...|6-2-2020|Sitting|blue llama| 1|
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
df_grp.show(truncate=False)
+-------+------------------+----------------+---------------------+
|agg_col|time |status |llamaType |
+-------+------------------+----------------+---------------------+
|1 |5-1-2020#;6-2-2020|Running#;Sitting|red llama#;blue llama|
+-------+------------------+----------------+---------------------+
一种方法是使用 SQL 函数 inline
展平动物属性数组 struct
并通过 collect_list
聚合属性,然后与特定分隔符。
给定一个与您提供的模式相似的 DataFrame df
,以下转换将生成所需的数据集,dfResult
:
val attribCSVs = List("time", "status", "llamaType").map(
c => concat_ws("#;", collect_list(c)).as(c)
)
val dfResult = df.
select($"eventId", expr("inline(llamaEvent.activity.Animal)")).
groupBy("eventId").agg(attribCSVs.head, attribCSVs.tail: _*)
请注意,已将事件标识列 eventId
添加到样本 json 数据中以进行必要的 groupBy
聚合。
让我们assemble一些示例数据:
val jsons = Seq(
"""{
"eventId": 1,
"llamaEvent":{
"activity":{
"Animal":[
{
"time":"5-1-2020",
"status":"Running",
"llamaType":"red llama"
},
{
"time":"6-2-2020",
"status":"Sitting",
"llamaType":"blue llama"
}
]
}
}
}""",
"""{
"eventId": 2,
"llamaEvent":{
"activity":{
"Animal":[
{
"time":"5-2-2020",
"status":"Running",
"llamaType":"red llama"
},
{
"time":"6-3-2020",
"status":"Standing",
"llamaType":"blue llama"
}
]
}
}
}"""
)
val df = spark.read.option("multiLine", true).json(jsons.toDS)
df.show(false)
+-------+----------------------------------------------------------------------+
|eventId|llamaEvent |
+-------+----------------------------------------------------------------------+
|1 |{{[{red llama, Running, 5-1-2020}, {blue llama, Sitting, 6-2-2020}]}} |
|2 |{{[{red llama, Running, 5-2-2020}, {blue llama, Standing, 6-3-2020}]}}|
+-------+----------------------------------------------------------------------+
应用上述转换,dfResult
应如下所示:
dfResult.show(false)
+-------+------------------+-----------------+---------------------+
|eventId|time |status |llamaType |
+-------+------------------+-----------------+---------------------+
|1 |5-1-2020#;6-2-2020|Running#;Sitting |red llama#;blue llama|
|2 |5-2-2020#;6-3-2020|Running#;Standing|red llama#;blue llama|
+-------+------------------+-----------------+---------------------+
正在将 dfResult
写入 CSV 文件:
dfResult.write.option("header", true).csv("/path/to/csv")
/*
eventId,time,status,llamaType
1,5-1-2020#;6-2-2020,Running#;Sitting,red llama#;blue llama
2,5-2-2020#;6-3-2020,Running#;Standing,red llama#;blue llama
*/