Better/Efficient 从 Spark Table 解析嵌套 JSON 列的方法

Better/Efficient Ways to Parsing Nested JSON Column from Spark Table

我是 Spark 和 Scala 的新手。我正在尝试从 Spark Table 解析嵌套 JSON 格式的列。这是 table 的先睹为快(我只展示了 Spark Table 的第一行,其余部分看起来都一样)

doc.show(1)

  doc_content                      object_id      object_version       
{"id":"lni001","pub_date"....       20220301         7098727      

"doc_content" 列的每一行的结构如下所示(某些行可能在 'content' 字段):

{
   "id":"lni001",
   "pub_date":"20220301",
   "doc_id":"7098727",
   "unique_id":"64WP-UI-POLI",
   "content":[
      {
         "c_id":"002",
         "p_id":"P02",
         "type":"org",
         "source":"internet"  
      },
      {
         "c_id":"003",
         "p_id":"P03",
         "type":"org",
         "source":"internet" 
      },
      {
         "c_id":"005",
         "p_id":"K01",
         "type":"people",
         "source":"news" 
      }
   ]
}

我尝试在“doc_content”列上使用 explode

doc.select(explode($"doc_content") as "doc_content")
    .withColumn("id", col("doc_info.id"))
    .withColumn("pub_date", col("doc_info.pub_date"))
    .withColumn("doc_id", col("doc_info.doc_id"))
    .withColumn("unique_id", col("doc_info.unique_id"))
    .withColumn("content", col("doc_info.content"))
    .withColumn("content", explode($"content"))
    .withColumn("c_id", col("content.c_id"))
    .withColumn("p_id", col("content.p_id"))
    .withColumn("type", col("content.type"))
    .withColumn("source", col("content.source"))
    .drop(col("doc_content"))
    .drop(col("content"))
    .show()

但是我得到了这个错误 org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`doc_content`)' due to data type mismatch: input to function explode should be array or map type, not string;。我正在努力将列转换为 Array 或 Map 类型(可能是 Scala LOL 的新手)。

解析“doc_content”列后,我希望 table 看起来像这样。

  id          pub_date      doc_id       unique_id     c_id    p_id   type     source    oject_id     object_version
lni001        20220301      7098727     64WP-UI-POLI    002     P02    org    internet   20220301         7098727  
lni001        20220301      7098727     64WP-UI-POLI    003     P03    org    internet   20220301         7098727  
lni001        20220301      7098727     64WP-UI-POLI    005     K01   people    news     20220301         7098727  

我想知道我该如何做到这一点,如果我能得到一些关于如何做到这一点的想法或方法,那就太好了。或者可能是比我的方法更好的方法,因为我在 Spark Table 中有数百万行,如果我能让它 运行 更快的话。

谢谢!

您可以使用 from_json 将 JSON 字符串解析为 MapType,然后在数组列上使用 explode 来创建新行,这意味着您应该在 doc_content.contentdoc_content.

指定用于解析 json 字符串的模式:

import org.apache.spark.sql.types._
val schema = new StructType()
  .add("id", StringType)
  .add("pub_date", StringType)
  .add("doc_id", StringType)
  .add("unique_id", StringType)
  .add("content", ArrayType(MapType(StringType, StringType)))

然后解析 json 字符串并展开它

df.select(
  $"object_id",
  $"object_version",
  from_json($"doc_content", schema).alias("doc_content")
).select(
  $"object_id",
  $"object_version",
  col("doc_content.id").alias("id"),
  col("doc_content.pub_date").alias("pub_date"),
  col("doc_content.doc_id").alias("doc_id"),
  col("doc_content.unique_id").alias("unique_id"),
  explode(col("doc_content.content")).alias("content")
).select(
  $"id",
  $"pub_date",
  $"doc_id",
  $"unique_id",
  col("content.c_id").alias("c_id"),
  col("content.p_id").alias("p_id"),
  col("content.type").alias("type"),
  col("content.source").alias("source"),
  $"object_id",
  $"object_version"
)