将 JSON 字符串列拆分为多列
Split JSON string column to multiple columns
我正在寻找一种通用解决方案,以将所有 json 字段提取为 JSON 字符串列中的列。
df = spark.read.load(path)
df.show()
'path'中文件的文件格式为parquet
示例数据
|id | json_data
| 1 | {"name":"abc", "depts":["dep01", "dep02"]}
| 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
| 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF","state":"CA"}}
预期输出
|id | name | depts | sal | address_city | address_state
| 1 | "abc" | ["dep01", "dep02"] | null| null | null
| 2 | "xyz" | ["dep03"] | 100 | null | null
| 3 | "pqr" | ["dep02"] | null| "SF" | "CA"
我知道我可以通过创建一个定义了架构的 StructType 并使用 'from_json' 方法来提取列。
但是这种方法需要手动定义模式。
val myStruct = StructType(
Seq(
StructField("name", StringType),
StructField("depts", ArrayType(StringType)),
StructField("sal", IntegerType)
))
var newDf = df.withColumn("depts", from_json(col("depts"), myStruct))
是否有更好的方法来展平 JSON 列而无需手动定义架构?
在提供的示例中,我可以看到可用的 JSON 字段。
但实际上,我无法遍历所有行来找到所有字段。
所以我正在寻找一种解决方案,将所有字段拆分为列,而无需指定列的名称或类型。
假设 json_data
是 map
类型(如果不是,您总是可以将其转换为 map
),您可以使用 getItem
:
df = spark.createDataFrame([
[1, {"name": "abc", "depts": ["dep01", "dep02"]}],
[2, {"name": "xyz", "depts": ["dep03"], "sal": 100}]
],
['id', 'json_data']
)
df.select(
df.id,
df.json_data.getItem('name').alias('name'),
df.json_data.getItem('depts').alias('depts'),
df.json_data.getItem('sal').alias('sal')
).show()
+---+----+--------------+----+
| id|name| depts| sal|
+---+----+--------------+----+
| 1| abc|[dep01, dep02]|null|
| 2| xyz| [dep03]| 100|
+---+----+--------------+----+
一种更动态的提取列的方法:
cols = ['name', 'depts', 'sal']
df.select(df.id, *(df.json_data.getItem(col).alias(col) for col in cols)).show()
如果它是一个 CSV
文件并且只有一列作为 JSON
数据出现。您可以使用以下解决方案。
val csvDF = spark.read.option("delimiter", "|").option("inferSchema", true).option("header", true).csv("test.csv")
val rdd = csvDF.select(" json_data").rdd.map(_.getString(0))
val ds = rdd.toDS
val jsonDF = spark.read.json(ds)
val jsonDFWithID = jsonDF.withColumn("id", monotonically_increasing_id())
val csvDFWithID = csvDF.select($"id ").withColumn("id", monotonically_increasing_id())
val joinDF = jsonDFWithID.join(csvDFWithID, "id").drop("id")
这是最终数据框的样子。
scala> joinDF.printSchema()
root
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- state: string (nullable = true)
|-- depts: array (nullable = true)
| |-- element: string (containsNull = true)
|-- name: string (nullable = true)
|-- sal: long (nullable = true)
|-- id : double (nullable = true)
如果是 JSON
文件,则以下解决方案可行。
为了我。 inferSchema
工作得很好。
json 文件
~/Downloads ▶ cat test.json
{"id": 1, "name":"abc", "depts":["dep01", "dep02"]},
{"id": 2, "name":"xyz", "depts" :["dep03"],"sal":100}
代码
scala> scc.read.format("json").option("inerSchema", true).load("Downloads/test.json").show()
+--------------+---+----+----+
| depts| id|name| sal|
+--------------+---+----+----+
|[dep01, dep02]| 1| abc|null|
| [dep03]| 2| xyz| 100|
+--------------+---+----+----+
根据@Gaurang Shah 的回答,我已经实施了一个解决方案来处理嵌套 JSON 结构并解决了使用 monotonically_increasing_id(非顺序)
的问题
在这种方法中,'populateColumnName' 函数递归检查 StructType 列并填充列名。
'renameColumns' 函数通过替换“.”来重命名列用“_”标识嵌套的 json 个字段。
'addIndex' 函数在解析 JSON 列后向数据帧添加索引以加入数据帧。
def flattenJSON(df : DataFrame, columnName: String) : DataFrame = {
val indexCol = "internal_temp_id"
def populateColumnName(col : StructField) : Array[String] = {
col.dataType match {
case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _)
case rest => Array(col.name)
}
}
def renameColumns(name : String) : String = {
if(name contains ".") {
name + " as " + name.replaceAll("\.", "_")
}
else name
}
def addIndex(df : DataFrame) : DataFrame = {
// Append "rowid" column of type Long
val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false)))
// Zip on RDD level
val rddWithId = df.rdd.zipWithIndex
// Convert back to DataFrame
spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
}
val dfWithID = addIndex(df)
val jsonDF = df.select(columnName)
val ds = jsonDF.rdd.map(_.getString(0)).toDS
val parseDF = spark.read.option("inferSchema",true).json(ds)
val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns)
var resultDF = parseDF.selectExpr(columnNames:_*)
val jsonDFWithID = addIndex(resultDF)
val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol)
joinDF
}
val res = flattenJSON(jsonDF, "address")
我正在寻找一种通用解决方案,以将所有 json 字段提取为 JSON 字符串列中的列。
df = spark.read.load(path)
df.show()
'path'中文件的文件格式为parquet
示例数据
|id | json_data
| 1 | {"name":"abc", "depts":["dep01", "dep02"]}
| 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
| 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF","state":"CA"}}
预期输出
|id | name | depts | sal | address_city | address_state
| 1 | "abc" | ["dep01", "dep02"] | null| null | null
| 2 | "xyz" | ["dep03"] | 100 | null | null
| 3 | "pqr" | ["dep02"] | null| "SF" | "CA"
我知道我可以通过创建一个定义了架构的 StructType 并使用 'from_json' 方法来提取列。
但是这种方法需要手动定义模式。
val myStruct = StructType(
Seq(
StructField("name", StringType),
StructField("depts", ArrayType(StringType)),
StructField("sal", IntegerType)
))
var newDf = df.withColumn("depts", from_json(col("depts"), myStruct))
是否有更好的方法来展平 JSON 列而无需手动定义架构? 在提供的示例中,我可以看到可用的 JSON 字段。 但实际上,我无法遍历所有行来找到所有字段。
所以我正在寻找一种解决方案,将所有字段拆分为列,而无需指定列的名称或类型。
假设 json_data
是 map
类型(如果不是,您总是可以将其转换为 map
),您可以使用 getItem
:
df = spark.createDataFrame([
[1, {"name": "abc", "depts": ["dep01", "dep02"]}],
[2, {"name": "xyz", "depts": ["dep03"], "sal": 100}]
],
['id', 'json_data']
)
df.select(
df.id,
df.json_data.getItem('name').alias('name'),
df.json_data.getItem('depts').alias('depts'),
df.json_data.getItem('sal').alias('sal')
).show()
+---+----+--------------+----+
| id|name| depts| sal|
+---+----+--------------+----+
| 1| abc|[dep01, dep02]|null|
| 2| xyz| [dep03]| 100|
+---+----+--------------+----+
一种更动态的提取列的方法:
cols = ['name', 'depts', 'sal']
df.select(df.id, *(df.json_data.getItem(col).alias(col) for col in cols)).show()
如果它是一个 CSV
文件并且只有一列作为 JSON
数据出现。您可以使用以下解决方案。
val csvDF = spark.read.option("delimiter", "|").option("inferSchema", true).option("header", true).csv("test.csv")
val rdd = csvDF.select(" json_data").rdd.map(_.getString(0))
val ds = rdd.toDS
val jsonDF = spark.read.json(ds)
val jsonDFWithID = jsonDF.withColumn("id", monotonically_increasing_id())
val csvDFWithID = csvDF.select($"id ").withColumn("id", monotonically_increasing_id())
val joinDF = jsonDFWithID.join(csvDFWithID, "id").drop("id")
这是最终数据框的样子。
scala> joinDF.printSchema()
root
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- state: string (nullable = true)
|-- depts: array (nullable = true)
| |-- element: string (containsNull = true)
|-- name: string (nullable = true)
|-- sal: long (nullable = true)
|-- id : double (nullable = true)
如果是 JSON
文件,则以下解决方案可行。
为了我。 inferSchema
工作得很好。
json 文件
~/Downloads ▶ cat test.json
{"id": 1, "name":"abc", "depts":["dep01", "dep02"]},
{"id": 2, "name":"xyz", "depts" :["dep03"],"sal":100}
代码
scala> scc.read.format("json").option("inerSchema", true).load("Downloads/test.json").show()
+--------------+---+----+----+
| depts| id|name| sal|
+--------------+---+----+----+
|[dep01, dep02]| 1| abc|null|
| [dep03]| 2| xyz| 100|
+--------------+---+----+----+
根据@Gaurang Shah 的回答,我已经实施了一个解决方案来处理嵌套 JSON 结构并解决了使用 monotonically_increasing_id(非顺序)
的问题在这种方法中,'populateColumnName' 函数递归检查 StructType 列并填充列名。
'renameColumns' 函数通过替换“.”来重命名列用“_”标识嵌套的 json 个字段。
'addIndex' 函数在解析 JSON 列后向数据帧添加索引以加入数据帧。
def flattenJSON(df : DataFrame, columnName: String) : DataFrame = {
val indexCol = "internal_temp_id"
def populateColumnName(col : StructField) : Array[String] = {
col.dataType match {
case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _)
case rest => Array(col.name)
}
}
def renameColumns(name : String) : String = {
if(name contains ".") {
name + " as " + name.replaceAll("\.", "_")
}
else name
}
def addIndex(df : DataFrame) : DataFrame = {
// Append "rowid" column of type Long
val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false)))
// Zip on RDD level
val rddWithId = df.rdd.zipWithIndex
// Convert back to DataFrame
spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
}
val dfWithID = addIndex(df)
val jsonDF = df.select(columnName)
val ds = jsonDF.rdd.map(_.getString(0)).toDS
val parseDF = spark.read.option("inferSchema",true).json(ds)
val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns)
var resultDF = parseDF.selectExpr(columnNames:_*)
val jsonDFWithID = addIndex(resultDF)
val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol)
joinDF
}
val res = flattenJSON(jsonDF, "address")