Spark 无法爆炸列
Spark unable to Explode column
对于给定的 JSON 响应:
{
"id": "1575972348068_1649088229",
"results": [
{
"rows_count": 53,
"runtime_seconds": 0.004000000189989805,
"columns": [
"ROLE_ID",
"ROLE_NAME"
],
"columns_type": [
"number",
"string"
],
"limit": 2000000000,
"index": 0,
"rows": [
[
"6",
"Incentive Plan Advisor "
],
[
"7",
"Security Admin "
]
],
"command": "<an sql command>"
}
],
"status": "completed"
}
我想在这个 JSON 中获取 rows
作为 Spark Dataframe。为此,我正在尝试 explode
results
条目使用:
response.show()
val flattened = response.select($"results", explode($"results").as("results_flat1")).select($"results_flat1")
flattened.show()
我收到这样的回复:
+--------------------+--------------------+---------+
| id| results| status|
+--------------------+--------------------+---------+
|1575972687102_374...|[[[ROLE_ID, ROLE_...|completed|
+--------------------+--------------------+---------+
+--------------------+
| results_flat1|
+--------------------+
|[[ROLE_ID, ROLE_N...|
+--------------------+
在尝试再执行 explode
时,出现此错误:
flattened.select($"results_flat1", explode($"results_flat1").as("results_flat2"))
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`results_flat1`)' due to data type mismatch: input to function explode should be array or map type, not struct<columns:array<string>,columns_type:array<string>,command:string,index:bigint,limit:bigint,rows:array<array<string>>,rows_count:bigint,runtime_seconds:double>;;
'Project [results_flat1#91, explode(results_flat1#91) AS results_flat2#99]
+- Project [results_flat1#91]
+- Project [results#75, results_flat1#91]
+- Generate explode(results#75), false, [results_flat1#91]
+- LogicalRDD [id#74, results#75, status#76], false
根据我的分析,我可以看出对于爆炸,我们需要有一个字符串或一个字符串数组才能使爆炸工作。为此,我尝试了:
val x = spark.read.json(Seq(flattened.first().get(0).asInstanceOf[String]).toDS())
x.show()
尝试这个,出现另一个错误:
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
at org.apache.spark.sql.Row$class.getString(Row.scala:255)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:166)
at com.ibm.cmdwcloud.operations.SelectOperations.getRoleListFromEntitlement(SelectOperations.scala:23)
at com.ibm.cmdwcloud.Main$.main(Main.scala:22)
at com.ibm.cmdwcloud.Main.main(Main.scala)
我不知道有什么方法可以帮助我直接获取行对象并将其转换为 DataFrame。请帮忙解决这个问题。
编辑:
不过我能看到这个架构:
root
|-- results_flat1: struct (nullable = true)
| |-- columns: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- columns_type: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- command: string (nullable = true)
| |-- index: long (nullable = true)
| |-- limit: long (nullable = true)
| |-- rows: array (nullable = true)
| | |-- element: array (containsNull = true)
| | | |-- element: string (containsNull = true)
| |-- rows_count: long (nullable = true)
| |-- runtime_seconds: double (nullable = true)
但是无法在这个上面爆炸..
编辑 2:
多亏了下面的回复,我离我想要实现的目标更近了一步。我执行了这个:
val flattened = response.select($"results", explode($"results").as("results_flat1"))
.select("results_flat1.*")
.select($"rows", explode($"rows").as("rows_flat"))
.select($"rows_flat")
flattened.show()
得到这个输出:
+--------------------+
| rows_flat|
+--------------------+
|[6, Incentive Pla...|
|[7, Security Admi...|
+--------------------+
现在是否可以分解它并将其进一步映射到模式,以便我可以获得类似的内容:
+--------------------+--------------------+
| role_id| role_name|
+--------------------+--------------------+
| 6| Incentive Plan Ad..|
| 7| Security Admin|
+--------------------+--------------------+
您不必将结构炸开两次。
这个合适吗?
val flattened = response.select(explode($"results").as("results_flat1"))
.select("results_flat1.*")
flattened.show(false)
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
|columns |columns_type |command |index|limit |rows |rows_count|runtime_seconds |
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
|[ROLE_ID, ROLE_NAME]|[number, string]|<an sql command>|0 |2000000000|[WrappedArray(6, Incentive Plan Advisor), WrappedArray(7, Security Admin)]|53 |0.004000000189989805|
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
编辑 2:这是您的 (编辑 2)
的解决方案
val flattened = response.select($"results", explode($"results").as("results_flat1"))
.select("results_flat1.*")
.select(explode($"rows").as("rows"))
.select($"rows".getItem(0).as("idx"),$"rows".getItem(1).as("label"))
输出:
+---+--------------------+
|idx| label|
+---+--------------------+
| 6|Incentive Plan Ad...|
| 7| Security Admin|
+---+--------------------+
对于给定的 JSON 响应:
{
"id": "1575972348068_1649088229",
"results": [
{
"rows_count": 53,
"runtime_seconds": 0.004000000189989805,
"columns": [
"ROLE_ID",
"ROLE_NAME"
],
"columns_type": [
"number",
"string"
],
"limit": 2000000000,
"index": 0,
"rows": [
[
"6",
"Incentive Plan Advisor "
],
[
"7",
"Security Admin "
]
],
"command": "<an sql command>"
}
],
"status": "completed"
}
我想在这个 JSON 中获取 rows
作为 Spark Dataframe。为此,我正在尝试 explode
results
条目使用:
response.show()
val flattened = response.select($"results", explode($"results").as("results_flat1")).select($"results_flat1")
flattened.show()
我收到这样的回复:
+--------------------+--------------------+---------+
| id| results| status|
+--------------------+--------------------+---------+
|1575972687102_374...|[[[ROLE_ID, ROLE_...|completed|
+--------------------+--------------------+---------+
+--------------------+
| results_flat1|
+--------------------+
|[[ROLE_ID, ROLE_N...|
+--------------------+
在尝试再执行 explode
时,出现此错误:
flattened.select($"results_flat1", explode($"results_flat1").as("results_flat2"))
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`results_flat1`)' due to data type mismatch: input to function explode should be array or map type, not struct<columns:array<string>,columns_type:array<string>,command:string,index:bigint,limit:bigint,rows:array<array<string>>,rows_count:bigint,runtime_seconds:double>;;
'Project [results_flat1#91, explode(results_flat1#91) AS results_flat2#99]
+- Project [results_flat1#91]
+- Project [results#75, results_flat1#91]
+- Generate explode(results#75), false, [results_flat1#91]
+- LogicalRDD [id#74, results#75, status#76], false
根据我的分析,我可以看出对于爆炸,我们需要有一个字符串或一个字符串数组才能使爆炸工作。为此,我尝试了:
val x = spark.read.json(Seq(flattened.first().get(0).asInstanceOf[String]).toDS())
x.show()
尝试这个,出现另一个错误:
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
at org.apache.spark.sql.Row$class.getString(Row.scala:255)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:166)
at com.ibm.cmdwcloud.operations.SelectOperations.getRoleListFromEntitlement(SelectOperations.scala:23)
at com.ibm.cmdwcloud.Main$.main(Main.scala:22)
at com.ibm.cmdwcloud.Main.main(Main.scala)
我不知道有什么方法可以帮助我直接获取行对象并将其转换为 DataFrame。请帮忙解决这个问题。
编辑:
不过我能看到这个架构:
root
|-- results_flat1: struct (nullable = true)
| |-- columns: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- columns_type: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- command: string (nullable = true)
| |-- index: long (nullable = true)
| |-- limit: long (nullable = true)
| |-- rows: array (nullable = true)
| | |-- element: array (containsNull = true)
| | | |-- element: string (containsNull = true)
| |-- rows_count: long (nullable = true)
| |-- runtime_seconds: double (nullable = true)
但是无法在这个上面爆炸..
编辑 2:
多亏了下面的回复,我离我想要实现的目标更近了一步。我执行了这个:
val flattened = response.select($"results", explode($"results").as("results_flat1"))
.select("results_flat1.*")
.select($"rows", explode($"rows").as("rows_flat"))
.select($"rows_flat")
flattened.show()
得到这个输出:
+--------------------+
| rows_flat|
+--------------------+
|[6, Incentive Pla...|
|[7, Security Admi...|
+--------------------+
现在是否可以分解它并将其进一步映射到模式,以便我可以获得类似的内容:
+--------------------+--------------------+
| role_id| role_name|
+--------------------+--------------------+
| 6| Incentive Plan Ad..|
| 7| Security Admin|
+--------------------+--------------------+
您不必将结构炸开两次。
这个合适吗?
val flattened = response.select(explode($"results").as("results_flat1"))
.select("results_flat1.*")
flattened.show(false)
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
|columns |columns_type |command |index|limit |rows |rows_count|runtime_seconds |
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
|[ROLE_ID, ROLE_NAME]|[number, string]|<an sql command>|0 |2000000000|[WrappedArray(6, Incentive Plan Advisor), WrappedArray(7, Security Admin)]|53 |0.004000000189989805|
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
编辑 2:这是您的 (编辑 2)
的解决方案val flattened = response.select($"results", explode($"results").as("results_flat1"))
.select("results_flat1.*")
.select(explode($"rows").as("rows"))
.select($"rows".getItem(0).as("idx"),$"rows".getItem(1).as("label"))
输出:
+---+--------------------+
|idx| label|
+---+--------------------+
| 6|Incentive Plan Ad...|
| 7| Security Admin|
+---+--------------------+