使用 Spark-Scala 解析列中的 JSON 根
Parse JSON root in a column using Spark-Scala
我遇到了问题,无法将 JSOM 的根转换为数据框中的记录,以获取未确定数量的记录。
我用 JSON 生成的数据框类似于以下内容:
val exampleJson = spark.createDataset(
"""
{"ITEM1512":
{"name":"Yin",
"address":{"city":"Columbus",
"state":"Ohio"}
},
"ITEM1518":
{"name":"Yang",
"address":{"city":"Working",
"state":"Marc"}
}
}""" :: Nil)
当我按照以下说明阅读它时
val itemsExample = spark.read.json(exampleJson)
生成的架构和数据框如下:
+-----------------------+-----------------------+
|ITEM1512 |ITEM1518 |
+-----------------------+-----------------------+
|[[Columbus, Ohio], Yin]|[[Working, Marc], Yang]|
+-----------------------+-----------------------+
root
|-- ITEM1512: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| |-- name: string (nullable = true)
|-- ITEM1518: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| |-- name: string (nullable = true)
但我想生成这样的东西:
+-----------------------+-----------------------+
|Item |Values |
+-----------------------+-----------------------+
|ITEM1512 |[[Columbus, Ohio], Yin]|
|ITEM1518 |[[Working, Marc], Yang]|
+-----------------------+-----------------------+
因此,为了解析此 JSON 数据,我需要读取所有列并将其添加到数据框中的记录中,因为我作为示例编写的项目不止这两项。事实上,我想在数据框中添加数百万个项目。
我正在尝试复制在此处找到的解决方案:
使用此代码:
val columns:Array[String] = itemsExample.columns
var arrayOfDFs:Array[DataFrame] = Array()
for(col_name <- columns){
val temp = itemsExample.selectExpr("explode("+col_name+") as element")
.select(
lit(col_name).as("Item"),
col("element.E").as("Value"))
arrayOfDFs = arrayOfDFs :+ temp
}
val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)
但是我遇到了这个问题,而在另一个问题的例子中,根在数组中,在我的例子中,根是一个 StrucType。因此抛出下一个异常:
org.apache.spark.sql.AnalysisException: cannot resolve
'explode(ITEM1512
)' due to data type mismatch: input to function
explode should be array or map type, not
struct,name:string>
您可以使用stack
功能。
Example:
itemsExample.selectExpr("""stack(2,'ITEM1512',ITEM1512,'ITEM1518',ITEM1518) as (Item,Values)""").
show(false)
//+--------+-----------------------+
//|Item |Values |
//+--------+-----------------------+
//|ITEM1512|[[Columbus, Ohio], Yin]|
//|ITEM1518|[[Working, Marc], Yang]|
//+--------+-----------------------+
更新:
Dynamic Stack query:
val stack=df.columns.map(x => s"'${x}',${x}").mkString(s"stack(${df.columns.size},",",",")as (Item,Values)")
//stack(2,'ITEM1512',ITEM1512,'ITEM1518',ITEM1518) as (Item,Values)
itemsExample.selectExpr(stack).show()
//+--------+-----------------------+
//|Item |Values |
//+--------+-----------------------+
//|ITEM1512|[[Columbus, Ohio], Yin]|
//|ITEM1518|[[Working, Marc], Yang]|
//+--------+-----------------------+
我遇到了问题,无法将 JSOM 的根转换为数据框中的记录,以获取未确定数量的记录。
我用 JSON 生成的数据框类似于以下内容:
val exampleJson = spark.createDataset(
"""
{"ITEM1512":
{"name":"Yin",
"address":{"city":"Columbus",
"state":"Ohio"}
},
"ITEM1518":
{"name":"Yang",
"address":{"city":"Working",
"state":"Marc"}
}
}""" :: Nil)
当我按照以下说明阅读它时
val itemsExample = spark.read.json(exampleJson)
生成的架构和数据框如下:
+-----------------------+-----------------------+
|ITEM1512 |ITEM1518 |
+-----------------------+-----------------------+
|[[Columbus, Ohio], Yin]|[[Working, Marc], Yang]|
+-----------------------+-----------------------+
root
|-- ITEM1512: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| |-- name: string (nullable = true)
|-- ITEM1518: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| |-- name: string (nullable = true)
但我想生成这样的东西:
+-----------------------+-----------------------+
|Item |Values |
+-----------------------+-----------------------+
|ITEM1512 |[[Columbus, Ohio], Yin]|
|ITEM1518 |[[Working, Marc], Yang]|
+-----------------------+-----------------------+
因此,为了解析此 JSON 数据,我需要读取所有列并将其添加到数据框中的记录中,因为我作为示例编写的项目不止这两项。事实上,我想在数据框中添加数百万个项目。
我正在尝试复制在此处找到的解决方案:
val columns:Array[String] = itemsExample.columns
var arrayOfDFs:Array[DataFrame] = Array()
for(col_name <- columns){
val temp = itemsExample.selectExpr("explode("+col_name+") as element")
.select(
lit(col_name).as("Item"),
col("element.E").as("Value"))
arrayOfDFs = arrayOfDFs :+ temp
}
val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)
但是我遇到了这个问题,而在另一个问题的例子中,根在数组中,在我的例子中,根是一个 StrucType。因此抛出下一个异常:
org.apache.spark.sql.AnalysisException: cannot resolve 'explode(
ITEM1512
)' due to data type mismatch: input to function explode should be array or map type, not struct,name:string>
您可以使用stack
功能。
Example:
itemsExample.selectExpr("""stack(2,'ITEM1512',ITEM1512,'ITEM1518',ITEM1518) as (Item,Values)""").
show(false)
//+--------+-----------------------+
//|Item |Values |
//+--------+-----------------------+
//|ITEM1512|[[Columbus, Ohio], Yin]|
//|ITEM1518|[[Working, Marc], Yang]|
//+--------+-----------------------+
更新:
Dynamic Stack query:
val stack=df.columns.map(x => s"'${x}',${x}").mkString(s"stack(${df.columns.size},",",",")as (Item,Values)")
//stack(2,'ITEM1512',ITEM1512,'ITEM1518',ITEM1518) as (Item,Values)
itemsExample.selectExpr(stack).show()
//+--------+-----------------------+
//|Item |Values |
//+--------+-----------------------+
//|ITEM1512|[[Columbus, Ohio], Yin]|
//|ITEM1518|[[Working, Marc], Yang]|
//+--------+-----------------------+