Pyspark 解析高度嵌套 json (Prometheus)
Pyspark parse highly nested json (Prometheus)
我真的很喜欢使用 PySpark 解析嵌套 JSON 数据的一些帮助-SQL 因为我是 PySpark 的新手。数据具有以下架构:
架构
root
|-- data: struct (nullable = true)
| |-- result: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- metric: struct (nullable = true)
| | | | |-- data0: string (nullable = true)
| | | | |-- data1: string (nullable = true)
| | | | |-- data2: string (nullable = true)
| | | | |-- data3: string (nullable = true)
| | | |-- values: array (nullable = true)
| | | | |-- element: array (containsNull = true)
| | | | | |-- element: string (containsNull = true)
| |-- resultType: string (nullable = true)
|-- status: string (nullable = true)
这是 JSON 文件(输入)的示例:
{"状态":"成功",
“数据”:{“结果类型”:“矩阵”,“结果”:
[{"metric":{"data0":"T","data1":"O"},"values":[[90,"0"],[80, "0"]] },
{"metric":{"data0":"K","data1":"S"},"values":[[70,"0"],[60, "0"]]} ,
{"metric":{"data2":"J","data3":"O"},"values":[[50,"0"],[40, "0"]]} ]}}
我的目标 我基本上想将数据放入以下数据框中:
1-
data0 | data1 | data2 | data3 |values
示例输出数据帧:
data0 | data1 | data2 | data3 | values
"T" | "O" | nan | nan| [90,"0"],[80, "0"]
"K" | "S" | nan | nan| [70,"0"],[60, "0"]
nan | nan | "J" | "O"| [50,"0"],[40, "0"]
2-
time | value | data0 | data1 | data2 | data3
示例输出数据帧
time | value |data0 | data1 | data2 | data3
90 | "0" | "T"| "O"| nan | nan
80 | "0" | "T"| "O"| nan | nan
70 | "0" | "K"| "S"| nan | nan
60 | "0" | "K"| "S"| nan | nan
50 | "0" | nan| nan| "J" | "O"
40 | "0" | nan| nan| "J" | "O"
此外,如果有任何方法可以使用 spark 的并行能力来加速此过程,那会很棒,因为解析的 json 文件以千兆字节为单位。
要获取第一个数据帧,您可以使用:
df = (
df.withColumn("data0", F.expr("transform(data.result, x -> x.metric.data0)"))
.withColumn("data1", F.expr("transform(data.result, x -> x.metric.data1)"))
.withColumn("data2", F.expr("transform(data.result, x -> x.metric.data2)"))
.withColumn("data3", F.expr("transform(data.result, x -> x.metric.data3)"))
.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(F.lit(0), F.lit(1), F.lit(2)))
.withColumn("items", F.explode(F.col("items")))
.withColumn("data0", F.col("data0").getItem(F.col("items")))
.withColumn("data1", F.col("data1").getItem(F.col("items")))
.withColumn("data2", F.col("data2").getItem(F.col("items")))
.withColumn("data3", F.col("data3").getItem(F.col("items")))
.withColumn("values", F.col("values").getItem(F.col("items")))
.drop("data", "status", "items")
)
结果:
root
|-- data0: string (nullable = true)
|-- data1: string (nullable = true)
|-- data2: string (nullable = true)
|-- data3: string (nullable = true)
|-- values: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
+-----+-----+-----+-----+------------------+
|data0|data1|data2|data3|values |
+-----+-----+-----+-----+------------------+
|T |O |null |null |[[90, 0], [80, 0]]|
|K |S |null |null |[[70, 0], [60, 0]]|
|null |null |J |O |[[50, 0], [40, 0]]|
+-----+-----+-----+-----+------------------+
要获得第二个,它是相同的,但具有额外的 explode
值:
df = (
df.withColumn("data0", F.expr("transform(data.result, x -> x.metric.data0)"))
.withColumn("data1", F.expr("transform(data.result, x -> x.metric.data1)"))
.withColumn("data2", F.expr("transform(data.result, x -> x.metric.data2)"))
.withColumn("data3", F.expr("transform(data.result, x -> x.metric.data3)"))
.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(F.lit(0), F.lit(1), F.lit(2)))
.withColumn("items", F.explode(F.col("items")))
.withColumn("data0", F.col("data0").getItem(F.col("items")))
.withColumn("data1", F.col("data1").getItem(F.col("items")))
.withColumn("data2", F.col("data2").getItem(F.col("items")))
.withColumn("data3", F.col("data3").getItem(F.col("items")))
.withColumn("values", F.col("values").getItem(F.col("items")))
.withColumn("values", F.explode("values"))
.withColumn("time", F.col("values").getItem(0))
.withColumn("value", F.col("values").getItem(1))
.drop("data", "status", "items", "values")
)
结果:
root
|-- data0: string (nullable = true)
|-- data1: string (nullable = true)
|-- data2: string (nullable = true)
|-- data3: string (nullable = true)
|-- time: string (nullable = true)
|-- value: string (nullable = true)
+-----+-----+-----+-----+----+-----+
|data0|data1|data2|data3|time|value|
+-----+-----+-----+-----+----+-----+
|T |O |null |null |90 |0 |
|T |O |null |null |80 |0 |
|K |S |null |null |70 |0 |
|K |S |null |null |60 |0 |
|null |null |J |O |50 |0 |
|null |null |J |O |40 |0 |
+-----+-----+-----+-----+----+-----+
- 更新:
自动化 data
名称和结果数量的示例:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json
data_names = []
number_of_results = 0
with open("test.json", "r") as f_in:
raw_data = json.load(f_in)
for item in raw_data["data"]["result"]:
number_of_results += 1
for key in item["metric"].keys():
if key not in data_names:
data_names.append(key)
spark = SparkSession.builder.getOrCreate()
df = spark.read.option("multiline", True).json("test.json")
for data_name in data_names:
df = df.withColumn(
data_name, F.expr(f"transform(data.result, x -> x.metric.{data_name})")
)
df = (
df.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(*[F.lit(x) for x in range(0, number_of_results)]))
.withColumn("items", F.explode(F.col("items")))
)
for data_name in data_names:
df = df.withColumn(data_name, F.col(data_name).getItem(F.col("items")))
df = df.withColumn("values", F.col("values").getItem(F.col("items"))).drop(
"data", "status", "items"
)
结果是第一个dataframe(同上)
我真的很喜欢使用 PySpark 解析嵌套 JSON 数据的一些帮助-SQL 因为我是 PySpark 的新手。数据具有以下架构:
架构
root
|-- data: struct (nullable = true)
| |-- result: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- metric: struct (nullable = true)
| | | | |-- data0: string (nullable = true)
| | | | |-- data1: string (nullable = true)
| | | | |-- data2: string (nullable = true)
| | | | |-- data3: string (nullable = true)
| | | |-- values: array (nullable = true)
| | | | |-- element: array (containsNull = true)
| | | | | |-- element: string (containsNull = true)
| |-- resultType: string (nullable = true)
|-- status: string (nullable = true)
这是 JSON 文件(输入)的示例:
{"状态":"成功",
“数据”:{“结果类型”:“矩阵”,“结果”:
[{"metric":{"data0":"T","data1":"O"},"values":[[90,"0"],[80, "0"]] },
{"metric":{"data0":"K","data1":"S"},"values":[[70,"0"],[60, "0"]]} ,
{"metric":{"data2":"J","data3":"O"},"values":[[50,"0"],[40, "0"]]} ]}}
我的目标 我基本上想将数据放入以下数据框中:
1-
data0 | data1 | data2 | data3 |values
示例输出数据帧:
data0 | data1 | data2 | data3 | values
"T" | "O" | nan | nan| [90,"0"],[80, "0"]
"K" | "S" | nan | nan| [70,"0"],[60, "0"]
nan | nan | "J" | "O"| [50,"0"],[40, "0"]
2-
time | value | data0 | data1 | data2 | data3
示例输出数据帧
time | value |data0 | data1 | data2 | data3
90 | "0" | "T"| "O"| nan | nan
80 | "0" | "T"| "O"| nan | nan
70 | "0" | "K"| "S"| nan | nan
60 | "0" | "K"| "S"| nan | nan
50 | "0" | nan| nan| "J" | "O"
40 | "0" | nan| nan| "J" | "O"
此外,如果有任何方法可以使用 spark 的并行能力来加速此过程,那会很棒,因为解析的 json 文件以千兆字节为单位。
要获取第一个数据帧,您可以使用:
df = (
df.withColumn("data0", F.expr("transform(data.result, x -> x.metric.data0)"))
.withColumn("data1", F.expr("transform(data.result, x -> x.metric.data1)"))
.withColumn("data2", F.expr("transform(data.result, x -> x.metric.data2)"))
.withColumn("data3", F.expr("transform(data.result, x -> x.metric.data3)"))
.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(F.lit(0), F.lit(1), F.lit(2)))
.withColumn("items", F.explode(F.col("items")))
.withColumn("data0", F.col("data0").getItem(F.col("items")))
.withColumn("data1", F.col("data1").getItem(F.col("items")))
.withColumn("data2", F.col("data2").getItem(F.col("items")))
.withColumn("data3", F.col("data3").getItem(F.col("items")))
.withColumn("values", F.col("values").getItem(F.col("items")))
.drop("data", "status", "items")
)
结果:
root
|-- data0: string (nullable = true)
|-- data1: string (nullable = true)
|-- data2: string (nullable = true)
|-- data3: string (nullable = true)
|-- values: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
+-----+-----+-----+-----+------------------+
|data0|data1|data2|data3|values |
+-----+-----+-----+-----+------------------+
|T |O |null |null |[[90, 0], [80, 0]]|
|K |S |null |null |[[70, 0], [60, 0]]|
|null |null |J |O |[[50, 0], [40, 0]]|
+-----+-----+-----+-----+------------------+
要获得第二个,它是相同的,但具有额外的 explode
值:
df = (
df.withColumn("data0", F.expr("transform(data.result, x -> x.metric.data0)"))
.withColumn("data1", F.expr("transform(data.result, x -> x.metric.data1)"))
.withColumn("data2", F.expr("transform(data.result, x -> x.metric.data2)"))
.withColumn("data3", F.expr("transform(data.result, x -> x.metric.data3)"))
.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(F.lit(0), F.lit(1), F.lit(2)))
.withColumn("items", F.explode(F.col("items")))
.withColumn("data0", F.col("data0").getItem(F.col("items")))
.withColumn("data1", F.col("data1").getItem(F.col("items")))
.withColumn("data2", F.col("data2").getItem(F.col("items")))
.withColumn("data3", F.col("data3").getItem(F.col("items")))
.withColumn("values", F.col("values").getItem(F.col("items")))
.withColumn("values", F.explode("values"))
.withColumn("time", F.col("values").getItem(0))
.withColumn("value", F.col("values").getItem(1))
.drop("data", "status", "items", "values")
)
结果:
root
|-- data0: string (nullable = true)
|-- data1: string (nullable = true)
|-- data2: string (nullable = true)
|-- data3: string (nullable = true)
|-- time: string (nullable = true)
|-- value: string (nullable = true)
+-----+-----+-----+-----+----+-----+
|data0|data1|data2|data3|time|value|
+-----+-----+-----+-----+----+-----+
|T |O |null |null |90 |0 |
|T |O |null |null |80 |0 |
|K |S |null |null |70 |0 |
|K |S |null |null |60 |0 |
|null |null |J |O |50 |0 |
|null |null |J |O |40 |0 |
+-----+-----+-----+-----+----+-----+
- 更新:
自动化 data
名称和结果数量的示例:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json
data_names = []
number_of_results = 0
with open("test.json", "r") as f_in:
raw_data = json.load(f_in)
for item in raw_data["data"]["result"]:
number_of_results += 1
for key in item["metric"].keys():
if key not in data_names:
data_names.append(key)
spark = SparkSession.builder.getOrCreate()
df = spark.read.option("multiline", True).json("test.json")
for data_name in data_names:
df = df.withColumn(
data_name, F.expr(f"transform(data.result, x -> x.metric.{data_name})")
)
df = (
df.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(*[F.lit(x) for x in range(0, number_of_results)]))
.withColumn("items", F.explode(F.col("items")))
)
for data_name in data_names:
df = df.withColumn(data_name, F.col(data_name).getItem(F.col("items")))
df = df.withColumn("values", F.col("values").getItem(F.col("items"))).drop(
"data", "status", "items"
)
结果是第一个dataframe(同上)