使用 Spark 从 json 字符串列表中提取数组
Extract array from list of json strings using Spark
我的数据框中有一列包含 JSON 的列表,但类型是字符串。我需要在此列上 运行 explode
,因此首先我需要将其转换为列表。我找不到很多关于此用例的参考资料。
示例数据:
columnName: "[{"name":"a","info":{"age":"1","grade":"b"},"other":7},{"random":"x"}, {...}]"
以上是数据的样子,字段不固定(索引 0 可能有 JSON 和一些字段,而索引 1 可能有一些字段和其他字段)。在列表中可以有更多嵌套的 JSON 或一些额外的字段。我目前正在使用这个 -
"""explode(split(regexp_replace(regexp_replace(colName, '(\\},)','}},'), '(\\[|\\])',''), "},")) as colName"""
我只是将“}”替换为“}}”,然后删除“[]”,然后在“}”上调用拆分,但这种方法不起作用,因为有嵌套 JSONs.
如何从字符串中提取数组?
你可以这样试试:
// Initial DataFrame
df.show(false)
+----------------------------------------------------------------------+
|columnName |
+----------------------------------------------------------------------+
|[{"name":"a","info":{"age":"1","grade":"b"},"other":7},{"random":"x"}]|
+----------------------------------------------------------------------+
df.printSchema()
root
|-- columnName: string (nullable = true)
// toArray is a user defined function that parses an array of json objects which is present as a string
import org.json.JSONArray
val toArray = udf { (data: String) => {
val jsonArray = new JSONArray(data)
var arr: Array[String] = Array()
val objects = (0 until jsonArray.length).map(x => jsonArray.getJSONObject(x))
objects.foreach { elem =>
arr :+= elem.toString
}
arr
}
}
// Using the udf and exploding the resultant array
val df1 = df.withColumn("columnName",explode(toArray(col("columnName"))))
df1.show(false)
+-----------------------------------------------------+
|columnName |
+-----------------------------------------------------+
|{"other":7,"name":"a","info":{"grade":"b","age":"1"}}|
|{"random":"x"} |
+-----------------------------------------------------+
df1.printSchema()
root
|-- columnName: string (nullable = true)
// Parsing the json string by obtaining the schema dynamically
val schema = spark.read.json(df1.select("columnName").rdd.map(x => x(0).toString)).schema
val df2 = df1.withColumn("columnName",from_json(col("columnName"),schema))
df2.show(false)
+---------------+
|columnName |
+---------------+
|[[1, b], a, 7,]|
|[,,, x] |
+---------------+
df2.printSchema()
root
|-- columnName: struct (nullable = true)
| |-- info: struct (nullable = true)
| | |-- age: string (nullable = true)
| | |-- grade: string (nullable = true)
| |-- name: string (nullable = true)
| |-- other: long (nullable = true)
| |-- random: string (nullable = true)
// Extracting all the fields from the json
df2.select(col("columnName.*")).show(false)
+------+----+-----+------+
|info |name|other|random|
+------+----+-----+------+
|[1, b]|a |7 |null |
|null |null|null |x |
+------+----+-----+------+
编辑:
如果可以使用get_json_object
功能的话可以试试这个
// Get the list of columns dynamically
val columns = spark.read.json(df1.select("columnName").rdd.map(x => x(0).toString)).columns
// define an empty array of Column type and get_json_object function to extract the columns
var extract_columns: Array[Column] = Array()
columns.foreach { column =>
extract_columns :+= get_json_object(col("columnName"), "$." + column).as(column)
}
df1.select(extract_columns: _*).show(false)
+-----------------------+----+-----+------+
|info |name|other|random|
+-----------------------+----+-----+------+
|{"grade":"b","age":"1"}|a |7 |null |
|null |null|null |x |
+-----------------------+----+-----+------+
请注意,info
列不是结构类型。您可能必须按照类似的方式提取嵌套 json
的列
我的数据框中有一列包含 JSON 的列表,但类型是字符串。我需要在此列上 运行 explode
,因此首先我需要将其转换为列表。我找不到很多关于此用例的参考资料。
示例数据:
columnName: "[{"name":"a","info":{"age":"1","grade":"b"},"other":7},{"random":"x"}, {...}]"
以上是数据的样子,字段不固定(索引 0 可能有 JSON 和一些字段,而索引 1 可能有一些字段和其他字段)。在列表中可以有更多嵌套的 JSON 或一些额外的字段。我目前正在使用这个 -
"""explode(split(regexp_replace(regexp_replace(colName, '(\\},)','}},'), '(\\[|\\])',''), "},")) as colName"""
我只是将“}”替换为“}}”,然后删除“[]”,然后在“}”上调用拆分,但这种方法不起作用,因为有嵌套 JSONs.
如何从字符串中提取数组?
你可以这样试试:
// Initial DataFrame
df.show(false)
+----------------------------------------------------------------------+
|columnName |
+----------------------------------------------------------------------+
|[{"name":"a","info":{"age":"1","grade":"b"},"other":7},{"random":"x"}]|
+----------------------------------------------------------------------+
df.printSchema()
root
|-- columnName: string (nullable = true)
// toArray is a user defined function that parses an array of json objects which is present as a string
import org.json.JSONArray
val toArray = udf { (data: String) => {
val jsonArray = new JSONArray(data)
var arr: Array[String] = Array()
val objects = (0 until jsonArray.length).map(x => jsonArray.getJSONObject(x))
objects.foreach { elem =>
arr :+= elem.toString
}
arr
}
}
// Using the udf and exploding the resultant array
val df1 = df.withColumn("columnName",explode(toArray(col("columnName"))))
df1.show(false)
+-----------------------------------------------------+
|columnName |
+-----------------------------------------------------+
|{"other":7,"name":"a","info":{"grade":"b","age":"1"}}|
|{"random":"x"} |
+-----------------------------------------------------+
df1.printSchema()
root
|-- columnName: string (nullable = true)
// Parsing the json string by obtaining the schema dynamically
val schema = spark.read.json(df1.select("columnName").rdd.map(x => x(0).toString)).schema
val df2 = df1.withColumn("columnName",from_json(col("columnName"),schema))
df2.show(false)
+---------------+
|columnName |
+---------------+
|[[1, b], a, 7,]|
|[,,, x] |
+---------------+
df2.printSchema()
root
|-- columnName: struct (nullable = true)
| |-- info: struct (nullable = true)
| | |-- age: string (nullable = true)
| | |-- grade: string (nullable = true)
| |-- name: string (nullable = true)
| |-- other: long (nullable = true)
| |-- random: string (nullable = true)
// Extracting all the fields from the json
df2.select(col("columnName.*")).show(false)
+------+----+-----+------+
|info |name|other|random|
+------+----+-----+------+
|[1, b]|a |7 |null |
|null |null|null |x |
+------+----+-----+------+
编辑:
如果可以使用get_json_object
功能的话可以试试这个
// Get the list of columns dynamically
val columns = spark.read.json(df1.select("columnName").rdd.map(x => x(0).toString)).columns
// define an empty array of Column type and get_json_object function to extract the columns
var extract_columns: Array[Column] = Array()
columns.foreach { column =>
extract_columns :+= get_json_object(col("columnName"), "$." + column).as(column)
}
df1.select(extract_columns: _*).show(false)
+-----------------------+----+-----+------+
|info |name|other|random|
+-----------------------+----+-----+------+
|{"grade":"b","age":"1"}|a |7 |null |
|null |null|null |x |
+-----------------------+----+-----+------+
请注意,info
列不是结构类型。您可能必须按照类似的方式提取嵌套 json