Better/Efficient 从 Spark Table 解析嵌套 JSON 列的方法
Better/Efficient Ways to Parsing Nested JSON Column from Spark Table
我是 Spark 和 Scala 的新手。我正在尝试从 Spark Table 解析嵌套 JSON 格式的列。这是 table 的先睹为快(我只展示了 Spark Table 的第一行,其余部分看起来都一样)
doc.show(1)
doc_content object_id object_version
{"id":"lni001","pub_date".... 20220301 7098727
"doc_content" 列的每一行的结构如下所示(某些行可能在 'content' 字段):
{
"id":"lni001",
"pub_date":"20220301",
"doc_id":"7098727",
"unique_id":"64WP-UI-POLI",
"content":[
{
"c_id":"002",
"p_id":"P02",
"type":"org",
"source":"internet"
},
{
"c_id":"003",
"p_id":"P03",
"type":"org",
"source":"internet"
},
{
"c_id":"005",
"p_id":"K01",
"type":"people",
"source":"news"
}
]
}
我尝试在“doc_content”列上使用 explode
doc.select(explode($"doc_content") as "doc_content")
.withColumn("id", col("doc_info.id"))
.withColumn("pub_date", col("doc_info.pub_date"))
.withColumn("doc_id", col("doc_info.doc_id"))
.withColumn("unique_id", col("doc_info.unique_id"))
.withColumn("content", col("doc_info.content"))
.withColumn("content", explode($"content"))
.withColumn("c_id", col("content.c_id"))
.withColumn("p_id", col("content.p_id"))
.withColumn("type", col("content.type"))
.withColumn("source", col("content.source"))
.drop(col("doc_content"))
.drop(col("content"))
.show()
但是我得到了这个错误 org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`doc_content`)' due to data type mismatch: input to function explode should be array or map type, not string;
。我正在努力将列转换为 Array 或 Map 类型(可能是 Scala LOL 的新手)。
解析“doc_content”列后,我希望 table 看起来像这样。
id pub_date doc_id unique_id c_id p_id type source oject_id object_version
lni001 20220301 7098727 64WP-UI-POLI 002 P02 org internet 20220301 7098727
lni001 20220301 7098727 64WP-UI-POLI 003 P03 org internet 20220301 7098727
lni001 20220301 7098727 64WP-UI-POLI 005 K01 people news 20220301 7098727
我想知道我该如何做到这一点,如果我能得到一些关于如何做到这一点的想法或方法,那就太好了。或者可能是比我的方法更好的方法,因为我在 Spark Table 中有数百万行,如果我能让它 运行 更快的话。
谢谢!
您可以使用 from_json
将 JSON 字符串解析为 MapType,然后在数组列上使用 explode
来创建新行,这意味着您应该在 doc_content.content
比 doc_content
.
指定用于解析 json 字符串的模式:
import org.apache.spark.sql.types._
val schema = new StructType()
.add("id", StringType)
.add("pub_date", StringType)
.add("doc_id", StringType)
.add("unique_id", StringType)
.add("content", ArrayType(MapType(StringType, StringType)))
然后解析 json 字符串并展开它
df.select(
$"object_id",
$"object_version",
from_json($"doc_content", schema).alias("doc_content")
).select(
$"object_id",
$"object_version",
col("doc_content.id").alias("id"),
col("doc_content.pub_date").alias("pub_date"),
col("doc_content.doc_id").alias("doc_id"),
col("doc_content.unique_id").alias("unique_id"),
explode(col("doc_content.content")).alias("content")
).select(
$"id",
$"pub_date",
$"doc_id",
$"unique_id",
col("content.c_id").alias("c_id"),
col("content.p_id").alias("p_id"),
col("content.type").alias("type"),
col("content.source").alias("source"),
$"object_id",
$"object_version"
)
我是 Spark 和 Scala 的新手。我正在尝试从 Spark Table 解析嵌套 JSON 格式的列。这是 table 的先睹为快(我只展示了 Spark Table 的第一行,其余部分看起来都一样)
doc.show(1)
doc_content object_id object_version
{"id":"lni001","pub_date".... 20220301 7098727
"doc_content" 列的每一行的结构如下所示(某些行可能在 'content' 字段):
{
"id":"lni001",
"pub_date":"20220301",
"doc_id":"7098727",
"unique_id":"64WP-UI-POLI",
"content":[
{
"c_id":"002",
"p_id":"P02",
"type":"org",
"source":"internet"
},
{
"c_id":"003",
"p_id":"P03",
"type":"org",
"source":"internet"
},
{
"c_id":"005",
"p_id":"K01",
"type":"people",
"source":"news"
}
]
}
我尝试在“doc_content”列上使用 explode
doc.select(explode($"doc_content") as "doc_content")
.withColumn("id", col("doc_info.id"))
.withColumn("pub_date", col("doc_info.pub_date"))
.withColumn("doc_id", col("doc_info.doc_id"))
.withColumn("unique_id", col("doc_info.unique_id"))
.withColumn("content", col("doc_info.content"))
.withColumn("content", explode($"content"))
.withColumn("c_id", col("content.c_id"))
.withColumn("p_id", col("content.p_id"))
.withColumn("type", col("content.type"))
.withColumn("source", col("content.source"))
.drop(col("doc_content"))
.drop(col("content"))
.show()
但是我得到了这个错误 org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`doc_content`)' due to data type mismatch: input to function explode should be array or map type, not string;
。我正在努力将列转换为 Array 或 Map 类型(可能是 Scala LOL 的新手)。
解析“doc_content”列后,我希望 table 看起来像这样。
id pub_date doc_id unique_id c_id p_id type source oject_id object_version
lni001 20220301 7098727 64WP-UI-POLI 002 P02 org internet 20220301 7098727
lni001 20220301 7098727 64WP-UI-POLI 003 P03 org internet 20220301 7098727
lni001 20220301 7098727 64WP-UI-POLI 005 K01 people news 20220301 7098727
我想知道我该如何做到这一点,如果我能得到一些关于如何做到这一点的想法或方法,那就太好了。或者可能是比我的方法更好的方法,因为我在 Spark Table 中有数百万行,如果我能让它 运行 更快的话。
谢谢!
您可以使用 from_json
将 JSON 字符串解析为 MapType,然后在数组列上使用 explode
来创建新行,这意味着您应该在 doc_content.content
比 doc_content
.
指定用于解析 json 字符串的模式:
import org.apache.spark.sql.types._
val schema = new StructType()
.add("id", StringType)
.add("pub_date", StringType)
.add("doc_id", StringType)
.add("unique_id", StringType)
.add("content", ArrayType(MapType(StringType, StringType)))
然后解析 json 字符串并展开它
df.select(
$"object_id",
$"object_version",
from_json($"doc_content", schema).alias("doc_content")
).select(
$"object_id",
$"object_version",
col("doc_content.id").alias("id"),
col("doc_content.pub_date").alias("pub_date"),
col("doc_content.doc_id").alias("doc_id"),
col("doc_content.unique_id").alias("unique_id"),
explode(col("doc_content.content")).alias("content")
).select(
$"id",
$"pub_date",
$"doc_id",
$"unique_id",
col("content.c_id").alias("c_id"),
col("content.p_id").alias("p_id"),
col("content.type").alias("type"),
col("content.source").alias("source"),
$"object_id",
$"object_version"
)