如何使用 Spark-Scala 解析 JSON 数据

How to parse the JSON data using Spark-Scala

我需要解析 JSON 数据,如下面的预期结果所示,目前我不知道如何在信号列中包含信号名称(ABS、ADA、ADW)。任何帮助将非常感激。

我尝试了一些结果,结果如下所示,但我还需要在 SIGNAL 列中包含所有信号,这在预期结果中显示。

jsonDF.select(explode($"ABS") as "element").withColumn("stime", col("element.E")).withColumn("can_value", col("element.V")).drop(col("element")).show()

+-------------+--------- --+
|        stime|can_value   |
+-------------+---------   +
|value of E   |value of V  |
+-------------+----------- +

df.printSchema

 -- ABS: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ADA: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ADW: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ALT: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)
 |-- APP: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)

I will need output like below:

-----------------+-------------+---------+
|SIGNAL        |stime            |can_value|
+-----------------+-------------+---------+
|ABS           |value of E   |value of V  |
|ADA           |value of E   |value of V  |
|ADW           |value of E   |value of V  |
+-----------------+-------------+---------+

要获得预期的输出,并在信号列中插入值:

jsonDF.select(explode($"ABS") as "element")
    .withColumn("stime", col("element.E"))
    .withColumn("can_value", col("element.V"))
    .drop(col("element"))
    .withColumn("SIGNAL",lit("ABS"))
    .show()

以及上述方法的通用版本:

(基于 df.printSchema 的结果假设,您将信号值作为列名称,并且这些列包含具有 struct(E,V) 形式元素的数组)

val columns:Array[String] = df.columns

var arrayOfDFs:Array[DataFrame] = Array()

for(col_name <- columns){

  val temp = df.selectExpr("explode("+col_name+") as element")
    .select(
      lit(col_name).as("SIGNAL"),
      col("element.E").as("stime"),
      col("element.V").as("can_value"))

  arrayOfDFs = arrayOfDFs :+ temp
}

val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)