Spark 无法爆炸列

Spark unable to Explode column

对于给定的 JSON 响应:

{
    "id": "1575972348068_1649088229",
    "results": [
        {
            "rows_count": 53,
            "runtime_seconds": 0.004000000189989805,
            "columns": [
                "ROLE_ID",
                "ROLE_NAME"
            ],
            "columns_type": [
                "number",
                "string"
            ],
            "limit": 2000000000,
            "index": 0,
            "rows": [
                [
                    "6",
                    "Incentive Plan Advisor                                                                              "
                ],
                [
                    "7",
                    "Security Admin                                                                                      "
                ]
            ],
            "command": "<an sql command>"
        }
    ],
    "status": "completed"
}

我想在这个 JSON 中获取 rows 作为 Spark Dataframe。为此,我正在尝试 explode results 条目使用:

response.show()
val flattened = response.select($"results", explode($"results").as("results_flat1")).select($"results_flat1")
        flattened.show()

我收到这样的回复:

+--------------------+--------------------+---------+
|                  id|             results|   status|
+--------------------+--------------------+---------+
|1575972687102_374...|[[[ROLE_ID, ROLE_...|completed|
+--------------------+--------------------+---------+

+--------------------+
|       results_flat1|
+--------------------+
|[[ROLE_ID, ROLE_N...|
+--------------------+

在尝试再执行 explode 时,出现此错误:

flattened.select($"results_flat1", explode($"results_flat1").as("results_flat2"))
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`results_flat1`)' due to data type mismatch: input to function explode should be array or map type, not struct<columns:array<string>,columns_type:array<string>,command:string,index:bigint,limit:bigint,rows:array<array<string>>,rows_count:bigint,runtime_seconds:double>;;
'Project [results_flat1#91, explode(results_flat1#91) AS results_flat2#99]
+- Project [results_flat1#91]
   +- Project [results#75, results_flat1#91]
      +- Generate explode(results#75), false, [results_flat1#91]
         +- LogicalRDD [id#74, results#75, status#76], false

根据我的分析,我可以看出对于爆炸,我们需要有一个字符串或一个字符串数组才能使爆炸工作。为此,我尝试了:

val x = spark.read.json(Seq(flattened.first().get(0).asInstanceOf[String]).toDS())
x.show()

尝试这个,出现另一个错误:

Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
    at org.apache.spark.sql.Row$class.getString(Row.scala:255)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:166)
    at com.ibm.cmdwcloud.operations.SelectOperations.getRoleListFromEntitlement(SelectOperations.scala:23)
    at com.ibm.cmdwcloud.Main$.main(Main.scala:22)
    at com.ibm.cmdwcloud.Main.main(Main.scala)

我不知道有什么方法可以帮助我直接获取行对象并将其转换为 DataFrame。请帮忙解决这个问题。

编辑:

不过我能看到这个架构:

root
 |-- results_flat1: struct (nullable = true)
 |    |-- columns: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- columns_type: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- command: string (nullable = true)
 |    |-- index: long (nullable = true)
 |    |-- limit: long (nullable = true)
 |    |-- rows: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- rows_count: long (nullable = true)
 |    |-- runtime_seconds: double (nullable = true)

但是无法在这个上面爆炸..

编辑 2:

多亏了下面的回复,我离我想要实现的目标更近了一步。我执行了这个:

val flattened = response.select($"results", explode($"results").as("results_flat1"))
            .select("results_flat1.*")
            .select($"rows", explode($"rows").as("rows_flat"))
            .select($"rows_flat")

flattened.show()

得到这个输出:

+--------------------+
|           rows_flat|
+--------------------+
|[6, Incentive Pla...|
|[7, Security Admi...|
+--------------------+

现在是否可以分解它并将其进一步映射到模式,以便我可以获得类似的内容:

+--------------------+--------------------+
|             role_id|           role_name|
+--------------------+--------------------+
|                   6| Incentive Plan Ad..|
|                   7|      Security Admin|
+--------------------+--------------------+

您不必将结构炸开两次。

这个合适吗?

val flattened = response.select(explode($"results").as("results_flat1"))
        .select("results_flat1.*")
    flattened.show(false)

+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
|columns             |columns_type    |command         |index|limit     |rows                                                                      |rows_count|runtime_seconds     |
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
|[ROLE_ID, ROLE_NAME]|[number, string]|<an sql command>|0    |2000000000|[WrappedArray(6, Incentive Plan Advisor), WrappedArray(7, Security Admin)]|53        |0.004000000189989805|
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+

编辑 2:这是您的 (编辑 2)

的解决方案
val flattened = response.select($"results", explode($"results").as("results_flat1"))
      .select("results_flat1.*")
      .select(explode($"rows").as("rows"))
      .select($"rows".getItem(0).as("idx"),$"rows".getItem(1).as("label"))

输出:

+---+--------------------+
|idx|               label|
+---+--------------------+
|  6|Incentive Plan Ad...|
|  7|      Security Admin|
+---+--------------------+