展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧
Flatten any nested json string and convert to dataframe using spark scala
我正在尝试创建从任何 json 字符串到数据框的数据框。 json 字符串通常很深并且有时嵌套。 json 字符串类似于:
val json_string = """{
"Total Value": 3,
"Topic": "Example",
"values": [
{
"value1": "#example1",
"points": [
[
"123",
"156"
]
],
"properties": {
"date": "12-04-19",
"model": "Model example 1"
}
},
{"value2": "#example2",
"points": [
[
"124",
"157"
]
],
"properties": {
"date": "12-05-19",
"model": "Model example 2"
}
}
]
}"""
我期望的输出是:
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
|Total Value| Topic |values 1 | values.points[0] | values.points[1] | values.properties.date | values.properties.model |
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
| 3 | Example | example1 | 123 | 156 | 12-04-19 | Model Example 1 |
| 3 | Example | example2 | 124 | 157 | 12-05-19 | Model example 2
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
我正在进行扁平化,但在 json 中选择了一些键来获取架构然后扁平化,但我不想以这种方式扁平化。它应该独立于要给出的任何密钥,并相应地展平,如上面的输出所示。
即使在这种情况下给出了作为值的键,由于点是数组,所以我仍然得到相同记录的 2 列,所以点 [0] 是一列,点 [1] 是不同的列。我的 Scala 火花代码是:
val key = "values" //Ideally this should not be given in my case.
val jsonFullDFSchemaString = spark.read.json(json_location).select(col(key)).schema.json; // changing values to reportData
val jsonFullDFSchemaStructType = DataType.fromJson(jsonFullDFSchemaString).asInstanceOf[StructType]
val df = spark.read.schema(jsonFullDFSchemaStructType).json(json_location);
现在为了展平我正在使用:
def flattenDataframe(df: DataFrame): DataFrame = {
//getting all the fields from schema
val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
//length shows the number of fields inside dataframe
val length = fields.length
for (i <- 0 to fields.length - 1) {
val field = fields(i)
val fieldtype = field.dataType
val fieldName = field.name
fieldtype match {
case arrayType: ArrayType =>
val fieldName1 = fieldName
val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName1)
val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName1) as $fieldName1")
//val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName1.*"))
val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
return flattenDataframe(explodedDf)
case structType: StructType =>
val childFieldnames = structType.fieldNames.map(childname => fieldName + "." + childname)
val newfieldNames = fieldNames.filter(_ != fieldName) ++ childFieldnames
val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_").replace("$", "_").replace("__", "_").replace(" ", "").replace("-", ""))))
val explodedf = df.select(renamedcols: _*)
return flattenDataframe(explodedf)
case _ =>
}
}
df
}
现在终于从 json:
得到扁平数据帧
val tableSchemaDF = flattenDataframe(df)
println(tableSchemaDF)
所以理想情况下,任何 json 文件都应该按照我上面显示的那样进行相应的展平,而不提供任何根键并且不创建 2 行。希望我已经提供了足够的细节。任何帮助将不胜感激。谢谢。
请注意:Json 数据来自 API,因此不能确定根密钥 'values' 是否存在。这就是为什么我不打算给出扁平化的关键。
这是一个基于递归的解决方案,最后 "hacky" 一点,因为你有特殊性:
@tailrec
def recurs(df: DataFrame): DataFrame = {
if(df.schema.fields.find(_.dataType match {
case ArrayType(StructType(_),_) | StructType(_) => true
case _ => false
}).isEmpty) df
else {
val columns = df.schema.fields.map(f => f.dataType match {
case _: ArrayType => explode(col(f.name)).as(f.name)
case s: StructType => col(s"${f.name}.*")
case _ => col(f.name)
})
recurs(df.select(columns:_*))
}
}
val recursedDF = recurs(df)
val valuesColumns = recursedDF.columns.filter(_.startsWith("value"))
val projectionDF = recursedDF.withColumn("values", coalesce(valuesColumns.map(col):_*))
.withColumn("point[0]", $"points".getItem(0))
.withColumn("point[1]", $"points".getItem(1))
.drop(valuesColumns :+ "points":_*)
projectionDF.show(false)
输出:
+-------+-----------+--------+---------------+---------+--------+--------+
|Topic |Total Value|date |model |values |point[0]|point[1]|
+-------+-----------+--------+---------------+---------+--------+--------+
|Example|3 |12-04-19|Model example 1|#example1|123 |156 |
|Example|3 |12-05-19|Model example 2|#example2|124 |157 |
+-------+-----------+--------+---------------+---------+--------+--------+
I would rather suggest going with the spark in-built
function. You can take advantage of the explode
of a spark
function to achieve this.
这是代码片段。
scala> val df = spark.read.json(Seq(json_string).toDS)
scala> var dfd = df.select($"topic",$"total value",explode($"values").as("values"))
这里我是根据您的需要来选择栏目。如果数据框中没有列,请根据您的要求添加。
scala> dfd.select($"topic",$"total value",$"values.points".getItem(0)(0).as("point_0"),$"values.points".getItem(0)(1).as("point_1"),$"values.properties.date".as("_date"),$"values.properties.model".as("_model")).show
+-------+-----------+-------+-------+--------+---------------+
| topic|total value|point_0|point_1| _date| _model|
+-------+-----------+-------+-------+--------+---------------+
|Example| 3| 123| 156|12-04-19|Model example 1|
|Example| 3| 124| 157|12-05-19|Model example 2|
+-------+-----------+-------+-------+--------+---------------+
如果 JSON 中的列数有限,此方法将为您提供最佳结果。
我正在尝试创建从任何 json 字符串到数据框的数据框。 json 字符串通常很深并且有时嵌套。 json 字符串类似于:
val json_string = """{
"Total Value": 3,
"Topic": "Example",
"values": [
{
"value1": "#example1",
"points": [
[
"123",
"156"
]
],
"properties": {
"date": "12-04-19",
"model": "Model example 1"
}
},
{"value2": "#example2",
"points": [
[
"124",
"157"
]
],
"properties": {
"date": "12-05-19",
"model": "Model example 2"
}
}
]
}"""
我期望的输出是:
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
|Total Value| Topic |values 1 | values.points[0] | values.points[1] | values.properties.date | values.properties.model |
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
| 3 | Example | example1 | 123 | 156 | 12-04-19 | Model Example 1 |
| 3 | Example | example2 | 124 | 157 | 12-05-19 | Model example 2
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
我正在进行扁平化,但在 json 中选择了一些键来获取架构然后扁平化,但我不想以这种方式扁平化。它应该独立于要给出的任何密钥,并相应地展平,如上面的输出所示。 即使在这种情况下给出了作为值的键,由于点是数组,所以我仍然得到相同记录的 2 列,所以点 [0] 是一列,点 [1] 是不同的列。我的 Scala 火花代码是:
val key = "values" //Ideally this should not be given in my case.
val jsonFullDFSchemaString = spark.read.json(json_location).select(col(key)).schema.json; // changing values to reportData
val jsonFullDFSchemaStructType = DataType.fromJson(jsonFullDFSchemaString).asInstanceOf[StructType]
val df = spark.read.schema(jsonFullDFSchemaStructType).json(json_location);
现在为了展平我正在使用:
def flattenDataframe(df: DataFrame): DataFrame = {
//getting all the fields from schema
val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
//length shows the number of fields inside dataframe
val length = fields.length
for (i <- 0 to fields.length - 1) {
val field = fields(i)
val fieldtype = field.dataType
val fieldName = field.name
fieldtype match {
case arrayType: ArrayType =>
val fieldName1 = fieldName
val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName1)
val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName1) as $fieldName1")
//val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName1.*"))
val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
return flattenDataframe(explodedDf)
case structType: StructType =>
val childFieldnames = structType.fieldNames.map(childname => fieldName + "." + childname)
val newfieldNames = fieldNames.filter(_ != fieldName) ++ childFieldnames
val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_").replace("$", "_").replace("__", "_").replace(" ", "").replace("-", ""))))
val explodedf = df.select(renamedcols: _*)
return flattenDataframe(explodedf)
case _ =>
}
}
df
}
现在终于从 json:
得到扁平数据帧val tableSchemaDF = flattenDataframe(df)
println(tableSchemaDF)
所以理想情况下,任何 json 文件都应该按照我上面显示的那样进行相应的展平,而不提供任何根键并且不创建 2 行。希望我已经提供了足够的细节。任何帮助将不胜感激。谢谢。
请注意:Json 数据来自 API,因此不能确定根密钥 'values' 是否存在。这就是为什么我不打算给出扁平化的关键。
这是一个基于递归的解决方案,最后 "hacky" 一点,因为你有特殊性:
@tailrec
def recurs(df: DataFrame): DataFrame = {
if(df.schema.fields.find(_.dataType match {
case ArrayType(StructType(_),_) | StructType(_) => true
case _ => false
}).isEmpty) df
else {
val columns = df.schema.fields.map(f => f.dataType match {
case _: ArrayType => explode(col(f.name)).as(f.name)
case s: StructType => col(s"${f.name}.*")
case _ => col(f.name)
})
recurs(df.select(columns:_*))
}
}
val recursedDF = recurs(df)
val valuesColumns = recursedDF.columns.filter(_.startsWith("value"))
val projectionDF = recursedDF.withColumn("values", coalesce(valuesColumns.map(col):_*))
.withColumn("point[0]", $"points".getItem(0))
.withColumn("point[1]", $"points".getItem(1))
.drop(valuesColumns :+ "points":_*)
projectionDF.show(false)
输出:
+-------+-----------+--------+---------------+---------+--------+--------+
|Topic |Total Value|date |model |values |point[0]|point[1]|
+-------+-----------+--------+---------------+---------+--------+--------+
|Example|3 |12-04-19|Model example 1|#example1|123 |156 |
|Example|3 |12-05-19|Model example 2|#example2|124 |157 |
+-------+-----------+--------+---------------+---------+--------+--------+
I would rather suggest going with the spark
in-built
function. You can take advantage of theexplode
of aspark
function to achieve this.
这是代码片段。
scala> val df = spark.read.json(Seq(json_string).toDS)
scala> var dfd = df.select($"topic",$"total value",explode($"values").as("values"))
这里我是根据您的需要来选择栏目。如果数据框中没有列,请根据您的要求添加。
scala> dfd.select($"topic",$"total value",$"values.points".getItem(0)(0).as("point_0"),$"values.points".getItem(0)(1).as("point_1"),$"values.properties.date".as("_date"),$"values.properties.model".as("_model")).show
+-------+-----------+-------+-------+--------+---------------+
| topic|total value|point_0|point_1| _date| _model|
+-------+-----------+-------+-------+--------+---------------+
|Example| 3| 123| 156|12-04-19|Model example 1|
|Example| 3| 124| 157|12-05-19|Model example 2|
+-------+-----------+-------+-------+--------+---------------+
如果 JSON 中的列数有限,此方法将为您提供最佳结果。