来自 JSON 的 Spark DataFrame 用行交换列

Spark DataFrame from JSON swap columns with rows

我有给定的 JSON,取自 HDFS,其中有数千条这样的记录:

    {
      "01": {
        "created": "2020-12-28 02-15-01", 
        "entity_id": "s.m_free", 
        "old_state_id": null, 
        "state": "1498.7"
      }, 
      "02": {
        "created": "2020-12-28 02-15-31", 
        "entity_id": "s.m_free", 
        "old_state_id": 58100, 
        "state": "1498.9"
      }, 
...}

不幸的是,DataFrame 以数千列和 4 行的形式出现,如下所示:

              |                 01 |                   02|..................| 
created       |2020-12-28 02-15-01 |  2020-12-28 02-15-31|..................|
entity_id     |           s.m_free |             s.m_free|..................|
old_state_id  |               null |                58100|..................|
state         |             1498.7 |               1498.9|..................|

我需要它有 4 列和数千条记录:

       |             created| entity_id| old_state_id|  state|
01     | 2020-12-28 02-15-01|  s.m.free|         null| 1498.7|
02     | 2020-12-28 02-15-31|  s.m.free|        58100| 1498.9|

我找到了一个 PySpark 选项,可以使用 Pandas 更改数据框的方向,但由于我必须使用 Scala 完成任务,所以我找不到类似的选项。

还有一种方法可以将名称放在第一列(记录 01、02 等),因为它似乎是 json 文件中值的键。

如果你能帮助我,我会很高兴。

这部分模拟原始dataframe的生成。
类似于这个例子,确保在真实场景中你也使用 option("primitivesAsString",true).
这是为了解决 unpivoting 不匹配类型问题,因为 Spark 默认类型为 null,即字符串。
例如。如果没有 option("primitivesAsString",true),对于 "old_state_id": 58100old_state_id 将被推断为 long,而对于 "old_state_id": null,它将被推断为字符串。

import spark.implicits._

val json_str = """
{
    "01": {
      "created": "2020-12-28 02-15-01", 
      "entity_id": "s.m_free", 
      "old_state_id": null, 
      "state": "1498.7"
    }, 
    "02": {
      "created": "2020-12-28 02-15-31", 
      "entity_id": "s.m_free", 
      "old_state_id": 58100, 
      "state": "1498.9"
    }
}"""

val df = spark.read.option("primitivesAsString",true).json(Seq(json_str).toDS)

df.printSchema()

root
 |-- 01: struct (nullable = true)
 |    |-- created: string (nullable = true)
 |    |-- entity_id: string (nullable = true)
 |    |-- old_state_id: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- 02: struct (nullable = true)
 |    |-- created: string (nullable = true)
 |    |-- entity_id: string (nullable = true)
 |    |-- old_state_id: string (nullable = true)
 |    |-- state: string (nullable = true)

df.show(false)

+---------------------------------------------+----------------------------------------------+
|01                                           |02                                            |
+---------------------------------------------+----------------------------------------------+
|{2020-12-28 02-15-01, s.m_free, null, 1498.7}|{2020-12-28 02-15-31, s.m_free, 58100, 1498.9}|
+---------------------------------------------+----------------------------------------------+

这是数据转换部分,基于stack

df.createOrReplaceTempView("t")
val cols_num = df.columns.size // 2
val cols_names_and_vals = (for (c <- df.columns) yield s"'$c',`$c`").mkString(",") // "'01',`01`,'02',`02`"
val sql_query = s"select id,val.* from (select stack($cols_num,$cols_names_and_vals) as (id,val) from t)" // select id,val.* from (select stack(2,'01',`01`,'02',`02`) as (id,val) from t)
val df_unpivot = spark.sql(sql_query)

df_unpivot.printSchema()

root
 |-- id: string (nullable = true)
 |-- created: string (nullable = true)
 |-- entity_id: string (nullable = true)
 |-- old_state_id: string (nullable = true)
 |-- state: string (nullable = true)

df_unpivot.show(truncate = false)

+---+-------------------+---------+------------+------+
|id |created            |entity_id|old_state_id|state |
+---+-------------------+---------+------------+------+
|01 |2020-12-28 02-15-01|s.m_free |null        |1498.7|
|02 |2020-12-28 02-15-31|s.m_free |58100       |1498.9|
+---+-------------------+---------+------------+------+