来自 JSON 的 Spark DataFrame 用行交换列
Spark DataFrame from JSON swap columns with rows
我有给定的 JSON,取自 HDFS,其中有数千条这样的记录:
{
"01": {
"created": "2020-12-28 02-15-01",
"entity_id": "s.m_free",
"old_state_id": null,
"state": "1498.7"
},
"02": {
"created": "2020-12-28 02-15-31",
"entity_id": "s.m_free",
"old_state_id": 58100,
"state": "1498.9"
},
...}
不幸的是,DataFrame 以数千列和 4 行的形式出现,如下所示:
| 01 | 02|..................|
created |2020-12-28 02-15-01 | 2020-12-28 02-15-31|..................|
entity_id | s.m_free | s.m_free|..................|
old_state_id | null | 58100|..................|
state | 1498.7 | 1498.9|..................|
我需要它有 4 列和数千条记录:
| created| entity_id| old_state_id| state|
01 | 2020-12-28 02-15-01| s.m.free| null| 1498.7|
02 | 2020-12-28 02-15-31| s.m.free| 58100| 1498.9|
我找到了一个 PySpark 选项,可以使用 Pandas 更改数据框的方向,但由于我必须使用 Scala 完成任务,所以我找不到类似的选项。
还有一种方法可以将名称放在第一列(记录 01、02 等),因为它似乎是 json 文件中值的键。
如果你能帮助我,我会很高兴。
这部分模拟原始dataframe的生成。
类似于这个例子,确保在真实场景中你也使用 option("primitivesAsString",true)
.
这是为了解决 unpivoting 不匹配类型问题,因为 Spark 默认类型为 null,即字符串。
例如。如果没有 option("primitivesAsString",true)
,对于 "old_state_id": 58100
,old_state_id
将被推断为 long,而对于 "old_state_id": null
,它将被推断为字符串。
import spark.implicits._
val json_str = """
{
"01": {
"created": "2020-12-28 02-15-01",
"entity_id": "s.m_free",
"old_state_id": null,
"state": "1498.7"
},
"02": {
"created": "2020-12-28 02-15-31",
"entity_id": "s.m_free",
"old_state_id": 58100,
"state": "1498.9"
}
}"""
val df = spark.read.option("primitivesAsString",true).json(Seq(json_str).toDS)
df.printSchema()
root
|-- 01: struct (nullable = true)
| |-- created: string (nullable = true)
| |-- entity_id: string (nullable = true)
| |-- old_state_id: string (nullable = true)
| |-- state: string (nullable = true)
|-- 02: struct (nullable = true)
| |-- created: string (nullable = true)
| |-- entity_id: string (nullable = true)
| |-- old_state_id: string (nullable = true)
| |-- state: string (nullable = true)
df.show(false)
+---------------------------------------------+----------------------------------------------+
|01 |02 |
+---------------------------------------------+----------------------------------------------+
|{2020-12-28 02-15-01, s.m_free, null, 1498.7}|{2020-12-28 02-15-31, s.m_free, 58100, 1498.9}|
+---------------------------------------------+----------------------------------------------+
这是数据转换部分,基于stack
df.createOrReplaceTempView("t")
val cols_num = df.columns.size // 2
val cols_names_and_vals = (for (c <- df.columns) yield s"'$c',`$c`").mkString(",") // "'01',`01`,'02',`02`"
val sql_query = s"select id,val.* from (select stack($cols_num,$cols_names_and_vals) as (id,val) from t)" // select id,val.* from (select stack(2,'01',`01`,'02',`02`) as (id,val) from t)
val df_unpivot = spark.sql(sql_query)
df_unpivot.printSchema()
root
|-- id: string (nullable = true)
|-- created: string (nullable = true)
|-- entity_id: string (nullable = true)
|-- old_state_id: string (nullable = true)
|-- state: string (nullable = true)
df_unpivot.show(truncate = false)
+---+-------------------+---------+------------+------+
|id |created |entity_id|old_state_id|state |
+---+-------------------+---------+------------+------+
|01 |2020-12-28 02-15-01|s.m_free |null |1498.7|
|02 |2020-12-28 02-15-31|s.m_free |58100 |1498.9|
+---+-------------------+---------+------------+------+
我有给定的 JSON,取自 HDFS,其中有数千条这样的记录:
{
"01": {
"created": "2020-12-28 02-15-01",
"entity_id": "s.m_free",
"old_state_id": null,
"state": "1498.7"
},
"02": {
"created": "2020-12-28 02-15-31",
"entity_id": "s.m_free",
"old_state_id": 58100,
"state": "1498.9"
},
...}
不幸的是,DataFrame 以数千列和 4 行的形式出现,如下所示:
| 01 | 02|..................|
created |2020-12-28 02-15-01 | 2020-12-28 02-15-31|..................|
entity_id | s.m_free | s.m_free|..................|
old_state_id | null | 58100|..................|
state | 1498.7 | 1498.9|..................|
我需要它有 4 列和数千条记录:
| created| entity_id| old_state_id| state|
01 | 2020-12-28 02-15-01| s.m.free| null| 1498.7|
02 | 2020-12-28 02-15-31| s.m.free| 58100| 1498.9|
我找到了一个 PySpark 选项,可以使用 Pandas 更改数据框的方向,但由于我必须使用 Scala 完成任务,所以我找不到类似的选项。
还有一种方法可以将名称放在第一列(记录 01、02 等),因为它似乎是 json 文件中值的键。
如果你能帮助我,我会很高兴。
这部分模拟原始dataframe的生成。
类似于这个例子,确保在真实场景中你也使用 option("primitivesAsString",true)
.
这是为了解决 unpivoting 不匹配类型问题,因为 Spark 默认类型为 null,即字符串。
例如。如果没有 option("primitivesAsString",true)
,对于 "old_state_id": 58100
,old_state_id
将被推断为 long,而对于 "old_state_id": null
,它将被推断为字符串。
import spark.implicits._
val json_str = """
{
"01": {
"created": "2020-12-28 02-15-01",
"entity_id": "s.m_free",
"old_state_id": null,
"state": "1498.7"
},
"02": {
"created": "2020-12-28 02-15-31",
"entity_id": "s.m_free",
"old_state_id": 58100,
"state": "1498.9"
}
}"""
val df = spark.read.option("primitivesAsString",true).json(Seq(json_str).toDS)
df.printSchema()
root
|-- 01: struct (nullable = true)
| |-- created: string (nullable = true)
| |-- entity_id: string (nullable = true)
| |-- old_state_id: string (nullable = true)
| |-- state: string (nullable = true)
|-- 02: struct (nullable = true)
| |-- created: string (nullable = true)
| |-- entity_id: string (nullable = true)
| |-- old_state_id: string (nullable = true)
| |-- state: string (nullable = true)
df.show(false)
+---------------------------------------------+----------------------------------------------+
|01 |02 |
+---------------------------------------------+----------------------------------------------+
|{2020-12-28 02-15-01, s.m_free, null, 1498.7}|{2020-12-28 02-15-31, s.m_free, 58100, 1498.9}|
+---------------------------------------------+----------------------------------------------+
这是数据转换部分,基于stack
df.createOrReplaceTempView("t")
val cols_num = df.columns.size // 2
val cols_names_and_vals = (for (c <- df.columns) yield s"'$c',`$c`").mkString(",") // "'01',`01`,'02',`02`"
val sql_query = s"select id,val.* from (select stack($cols_num,$cols_names_and_vals) as (id,val) from t)" // select id,val.* from (select stack(2,'01',`01`,'02',`02`) as (id,val) from t)
val df_unpivot = spark.sql(sql_query)
df_unpivot.printSchema()
root
|-- id: string (nullable = true)
|-- created: string (nullable = true)
|-- entity_id: string (nullable = true)
|-- old_state_id: string (nullable = true)
|-- state: string (nullable = true)
df_unpivot.show(truncate = false)
+---+-------------------+---------+------------+------+
|id |created |entity_id|old_state_id|state |
+---+-------------------+---------+------------+------+
|01 |2020-12-28 02-15-01|s.m_free |null |1498.7|
|02 |2020-12-28 02-15-31|s.m_free |58100 |1498.9|
+---+-------------------+---------+------------+------+