如何使用 Spark-Scala 解析 JSON 数据
How to parse the JSON data using Spark-Scala
我需要解析 JSON 数据,如下面的预期结果所示,目前我不知道如何在信号列中包含信号名称(ABS、ADA、ADW)。任何帮助将非常感激。
我尝试了一些结果,结果如下所示,但我还需要在 SIGNAL 列中包含所有信号,这在预期结果中显示。
jsonDF.select(explode($"ABS") as "element").withColumn("stime", col("element.E")).withColumn("can_value", col("element.V")).drop(col("element")).show()
+-------------+--------- --+
| stime|can_value |
+-------------+--------- +
|value of E |value of V |
+-------------+----------- +
df.printSchema
-- ABS: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: long (nullable = true)
|-- ADA: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: long (nullable = true)
|-- ADW: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: long (nullable = true)
|-- ALT: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
|-- APP: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
I will need output like below:
-----------------+-------------+---------+
|SIGNAL |stime |can_value|
+-----------------+-------------+---------+
|ABS |value of E |value of V |
|ADA |value of E |value of V |
|ADW |value of E |value of V |
+-----------------+-------------+---------+
要获得预期的输出,并在信号列中插入值:
jsonDF.select(explode($"ABS") as "element")
.withColumn("stime", col("element.E"))
.withColumn("can_value", col("element.V"))
.drop(col("element"))
.withColumn("SIGNAL",lit("ABS"))
.show()
以及上述方法的通用版本:
(基于 df.printSchema 的结果假设,您将信号值作为列名称,并且这些列包含具有 struct(E,V) 形式元素的数组)
val columns:Array[String] = df.columns
var arrayOfDFs:Array[DataFrame] = Array()
for(col_name <- columns){
val temp = df.selectExpr("explode("+col_name+") as element")
.select(
lit(col_name).as("SIGNAL"),
col("element.E").as("stime"),
col("element.V").as("can_value"))
arrayOfDFs = arrayOfDFs :+ temp
}
val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)
我需要解析 JSON 数据,如下面的预期结果所示,目前我不知道如何在信号列中包含信号名称(ABS、ADA、ADW)。任何帮助将非常感激。
我尝试了一些结果,结果如下所示,但我还需要在 SIGNAL 列中包含所有信号,这在预期结果中显示。
jsonDF.select(explode($"ABS") as "element").withColumn("stime", col("element.E")).withColumn("can_value", col("element.V")).drop(col("element")).show()
+-------------+--------- --+
| stime|can_value |
+-------------+--------- +
|value of E |value of V |
+-------------+----------- +
df.printSchema
-- ABS: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: long (nullable = true)
|-- ADA: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: long (nullable = true)
|-- ADW: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: long (nullable = true)
|-- ALT: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
|-- APP: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
I will need output like below:
-----------------+-------------+---------+
|SIGNAL |stime |can_value|
+-----------------+-------------+---------+
|ABS |value of E |value of V |
|ADA |value of E |value of V |
|ADW |value of E |value of V |
+-----------------+-------------+---------+
要获得预期的输出,并在信号列中插入值:
jsonDF.select(explode($"ABS") as "element")
.withColumn("stime", col("element.E"))
.withColumn("can_value", col("element.V"))
.drop(col("element"))
.withColumn("SIGNAL",lit("ABS"))
.show()
以及上述方法的通用版本:
(基于 df.printSchema 的结果假设,您将信号值作为列名称,并且这些列包含具有 struct(E,V) 形式元素的数组)
val columns:Array[String] = df.columns
var arrayOfDFs:Array[DataFrame] = Array()
for(col_name <- columns){
val temp = df.selectExpr("explode("+col_name+") as element")
.select(
lit(col_name).as("SIGNAL"),
col("element.E").as("stime"),
col("element.V").as("can_value"))
arrayOfDFs = arrayOfDFs :+ temp
}
val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)