Spark 解析带有变量 json 键的嵌套 json

Spark parse nested json with variable json keys

我有以下结构

 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- programs: struct (nullable = true)
 |    |    |    |-- **{ program id }**: struct (nullable = true)
 |    |    |    |    |-- Date: timestamp (nullable = true)
 |    |    |    |    |-- Name: string (nullable = true)
 |    |    |    |    |-- Some_Flags: struct (nullable = true)
 |    |    |    |    |    |-- abc: boolean (nullable = true)
 |    |    |    |    |    |-- def: boolean (nullable = true)
 |    |    |    |    |    |-- ghi: boolean (nullable = true)
 |    |    |    |    |    |-- xyz: boolean (nullable = true)




“groups” : [
 {
  … some other fields …
  “programs” : {
     “123c12b123456c1d76a4f265f10f20a0” : {
        “name” : “test_program_1”, 
        “some_flags” : {
           “abc” : true, 
           “def” : true, 
           “ghi” : false, 
           “xyz” : true
        }, 
        “date” : ISODate(“2019–11–16T03:29:00.000+0000”)
     }
 }
]

val data = spark.read.json("path").map(customParser) How do I use custom parser to map to case class?

数据来自 mongo 数据库。 需要分布式解析以便我可以遍历每一行。

由于 json 文档具有可变键(program id 不是常量键,而是随每个条目而变化),Spark 无法推断模式。一种选择是手动处理文档:

案例类:

case class SomeFlags(abc: Boolean, def1: Boolean, ghi: Boolean, xyz: Boolean)
case class Program(var programId: String, date: String, name: String, someFlags: SomeFlags)
case class Group(programs: Array[Program])
case class Groups(groups: Array[Group])

用于从 json 字符串中提取数据字段的 companion objects


object Groups {
  def unapply(values: Map[String, Object]) = try {
    val groups = values("groups").asInstanceOf[List[Map[String, Object]]]
    val grps = new ListBuffer[Group]()
    for (group <- groups) {
      val Group(grp) = group
      grps += grp
    }
    Some(Groups(Array(grps: _*)))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}
  
object Group {
  def unapply(values: Map[String, Object]) = try {
    val programs = values("programs").asInstanceOf[Map[String, Object]]
    val prgs = new ListBuffer[Program]()
    for ((k, v) <- programs) {
      val Program(prg) = v.asInstanceOf[Map[String, Object]];
      prg.programId = k;
      prgs += prg;
    }
    Some(Group(Array(prgs: _*)))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}

object Program {
  def unapply(values: Map[String, Object]) = try {
    val SomeFlags(flags) = values("some_flags").asInstanceOf[Map[String, Object]]
    Some(Program("pid", values("date").asInstanceOf[String], values("name").asInstanceOf[String], flags))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}

object SomeFlags {
  def unapply(values: Map[String, Object]) = try {
    Some(SomeFlags(values("abc").asInstanceOf[Boolean], values("def").asInstanceOf[Boolean], values("ghi").asInstanceOf[Boolean], values("xyz").asInstanceOf[Boolean]))
  } catch {
    case NonFatal(ex) => {
      println(ex)
      None
    }
  }
}

这里的关键部分是Group.unapply内部,其中prg.programId被手动设置为包含所有程序的映射的键。

最后是 Spark 代码。 DataframeReader.textFile 用于读取文件(每行应包含一个完整的 Json 文档)。结果是 Dataset[String] 并且任何其他生成每行包含一个完整 Json 文档的数据框的数据源也将起作用。

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}

val ds: Dataset[String] = spark.read.textFile(<path to file>)

val ds2: Dataset[Groups] = ds.map(s => {
  val mapper = new ObjectMapper() with ScalaObjectMapper //
  mapper.registerModule(DefaultScalaModule)
  val obj = mapper.readValue[Map[String, Object]](s)
  val Groups(groups) = obj
  groups
})

ds2 现在有架构:

root
 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- programs: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- programId: string (nullable = true)
 |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- someFlags: struct (nullable = true)
 |    |    |    |    |    |-- abc: boolean (nullable = false)
 |    |    |    |    |    |-- def1: boolean (nullable = false)
 |    |    |    |    |    |-- ghi: boolean (nullable = false)
 |    |    |    |    |    |-- xyz: boolean (nullable = false)

需要改进的地方:

  • unapply 方法中更好的错误处理
  • map 函数替换为 mapPartitions 以提高性能