从 Parquet 提取结构数组到多值 csv Spark Scala

Extract Array of Struct from Parquet into multi-value csv Spark Scala

我正在尝试使用 Spark Scala 从镶木地板中提取一个结构数组。输入是镶木地板文件。输出是一个 csv 文件。 csv 的字段可以具有由“#;”分隔的“多值”。 csv 由“,”分隔。实现此目标的最佳方法是什么?

架构

root
 |-- llamaEvent: struct (nullable = true)
 |    |-- activity: struct (nullable = true)
 |    |    |-- Animal: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- time: string (nullable = true)
 |    |    |    |    |-- status: string (nullable = true)
 |    |    |    |    |-- llamaType: string (nullable = true)

示例输入为 json(输入将为镶木地板)

{
   "llamaEvent":{
      "activity":{
         "Animal":[
            {
               "time":"5-1-2020",
               "status":"Running",
               "llamaType":"red llama"
            },
            {
               "time":"6-2-2020",
               "status":"Sitting",
               "llamaType":"blue llama"
            }
         ]
      }
   }
}

所需的 CSV 输出

time,status,llamaType
5-1-2020#;6-2-2020,running#;sitting,red llama#;blue llama

更新: 基于一些试验和错误,我相信根据用例,这样的解决方案可能是合适的。这通过抓取数组项,将其转换为字符串,然后解析出无关字符来实现“捷径”,这对某些用例很有用。

df.select(col("llamaEvent.activity").getItem("Animal").getItem("time")).cast("String"))

然后你就可以在regexp_replace

之后执行你想要的任何解析
df.withColumn("time", regexp_replace(col("time"),",",";#"))

还提出了几个适当的解决方案,同时使用 groupby、explode、aggregate。

这将是一个适合您的解决方案

df = spark.createDataFrame([(str([a_json]))],T.StringType())

df = df.withColumn('col', F.from_json("value", T.ArrayType(T.StringType())))
df = df.withColumn("col", F.explode("col"))


df = df.withColumn("col", F.from_json("col", T.MapType(T.StringType(), T.StringType())))
df = df.withColumn("llamaEvent", df.col.getItem("llamaEvent"))

df = df.withColumn("llamaEvent", F.from_json("llamaEvent", T.MapType(T.StringType(), T.StringType())))
df = df.select("*", F.explode("llamaEvent").alias("x","y"))

df = df.withColumn("Activity", F.from_json("y", T.MapType(T.StringType(), T.StringType())))
df = df.select("*", F.explode("Activity").alias("x","yy"))

df = df.withColumn("final_col", F.from_json("yy", T.ArrayType(T.StringType())))
df = df.withColumn("final_col", F.explode("final_col"))
df = df.withColumn("final_col", F.from_json("final_col", T.MapType(T.StringType(), T.StringType())))
df = df.withColumn("time", df.final_col.getItem("time")).withColumn("status", df.final_col.getItem("status")).withColumn("llamaType", df.final_col.getItem("llamaType")).withColumn("agg_col", F.lit("1"))

df_grp = df.groupby("agg_col").agg(F.concat_ws("#;", F.collect_list(df.time)).alias("time"), F.concat_ws("#;", F.collect_list(df.status)).alias("status"), F.concat_ws("#;", F.collect_list(df.llamaType)).alias("llamaType"))


display(df)
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
|               value|                 col|          llamaEvent|       x|                   y|            Activity|     x|                  yy|           final_col|    time| status| llamaType|agg_col|
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
|[{'llamaEvent': {...|[llamaEvent -> {"...|[activity -> {"An...|activity|{"Animal":[{"time...|[Animal -> [{"tim...|Animal|[{"time":"5-1-202...|[time -> 5-1-2020...|5-1-2020|Running| red llama|      1|
|[{'llamaEvent': {...|[llamaEvent -> {"...|[activity -> {"An...|activity|{"Animal":[{"time...|[Animal -> [{"tim...|Animal|[{"time":"5-1-202...|[time -> 6-2-2020...|6-2-2020|Sitting|blue llama|      1|
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------+--------------------+--------------------+--------+-------+----------+-------+
df_grp.show(truncate=False)
+-------+------------------+----------------+---------------------+
|agg_col|time              |status          |llamaType            |
+-------+------------------+----------------+---------------------+
|1      |5-1-2020#;6-2-2020|Running#;Sitting|red llama#;blue llama|
+-------+------------------+----------------+---------------------+

一种方法是使用 SQL 函数 inline 展平动物属性数组 struct 并通过 collect_list 聚合属性,然后与特定分隔符。

给定一个与您提供的模式相似的 DataFrame df,以下转换将生成所需的数据集,dfResult:

val attribCSVs = List("time", "status", "llamaType").map(
    c => concat_ws("#;", collect_list(c)).as(c)
  )

val dfResult = df.
  select($"eventId", expr("inline(llamaEvent.activity.Animal)")).
  groupBy("eventId").agg(attribCSVs.head, attribCSVs.tail: _*)

请注意,已将事件标识列 eventId 添加到样本 json 数据中以进行必要的 groupBy 聚合。

让我们assemble一些示例数据:

val jsons = Seq(
    """{
        "eventId": 1,
        "llamaEvent":{
            "activity":{
                "Animal":[
                    {
                        "time":"5-1-2020",
                        "status":"Running",
                        "llamaType":"red llama"
                    },
                    {
                        "time":"6-2-2020",
                        "status":"Sitting",
                        "llamaType":"blue llama"
                    }
                ]
            }
        }
    }""",
    """{
        "eventId": 2,
        "llamaEvent":{
            "activity":{
                "Animal":[
                    {
                        "time":"5-2-2020",
                        "status":"Running",
                        "llamaType":"red llama"
                    },
                    {
                        "time":"6-3-2020",
                        "status":"Standing",
                        "llamaType":"blue llama"
                    }
                ]
            }
        }
    }"""
)

val df = spark.read.option("multiLine", true).json(jsons.toDS) 

df.show(false)
+-------+----------------------------------------------------------------------+
|eventId|llamaEvent                                                            |
+-------+----------------------------------------------------------------------+
|1      |{{[{red llama, Running, 5-1-2020}, {blue llama, Sitting, 6-2-2020}]}} |
|2      |{{[{red llama, Running, 5-2-2020}, {blue llama, Standing, 6-3-2020}]}}|
+-------+----------------------------------------------------------------------+

应用上述转换,dfResult 应如下所示:

dfResult.show(false)
+-------+------------------+-----------------+---------------------+
|eventId|time              |status           |llamaType            |
+-------+------------------+-----------------+---------------------+
|1      |5-1-2020#;6-2-2020|Running#;Sitting |red llama#;blue llama|
|2      |5-2-2020#;6-3-2020|Running#;Standing|red llama#;blue llama|
+-------+------------------+-----------------+---------------------+

正在将 dfResult 写入 CSV 文件:

dfResult.write.option("header", true).csv("/path/to/csv")

/*
eventId,time,status,llamaType
1,5-1-2020#;6-2-2020,Running#;Sitting,red llama#;blue llama
2,5-2-2020#;6-3-2020,Running#;Standing,red llama#;blue llama
*/