Spark 解析带有变量 json 键的嵌套 json
Spark parse nested json with variable json keys
我有以下结构
根
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- programs: struct (nullable = true)
| | | |-- **{ program id }**: struct (nullable = true)
| | | | |-- Date: timestamp (nullable = true)
| | | | |-- Name: string (nullable = true)
| | | | |-- Some_Flags: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = true)
| | | | | |-- def: boolean (nullable = true)
| | | | | |-- ghi: boolean (nullable = true)
| | | | | |-- xyz: boolean (nullable = true)
“groups” : [
{
… some other fields …
“programs” : {
“123c12b123456c1d76a4f265f10f20a0” : {
“name” : “test_program_1”,
“some_flags” : {
“abc” : true,
“def” : true,
“ghi” : false,
“xyz” : true
},
“date” : ISODate(“2019–11–16T03:29:00.000+0000”)
}
}
]
val data = spark.read.json("path").map(customParser)
How do I use custom parser to map to case class?
数据来自 mongo 数据库。
需要分布式解析以便我可以遍历每一行。
由于 json 文档具有可变键(program id
不是常量键,而是随每个条目而变化),Spark 无法推断模式。一种选择是手动处理文档:
案例类:
case class SomeFlags(abc: Boolean, def1: Boolean, ghi: Boolean, xyz: Boolean)
case class Program(var programId: String, date: String, name: String, someFlags: SomeFlags)
case class Group(programs: Array[Program])
case class Groups(groups: Array[Group])
用于从 json 字符串中提取数据字段的 companion objects:
object Groups {
def unapply(values: Map[String, Object]) = try {
val groups = values("groups").asInstanceOf[List[Map[String, Object]]]
val grps = new ListBuffer[Group]()
for (group <- groups) {
val Group(grp) = group
grps += grp
}
Some(Groups(Array(grps: _*)))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object Group {
def unapply(values: Map[String, Object]) = try {
val programs = values("programs").asInstanceOf[Map[String, Object]]
val prgs = new ListBuffer[Program]()
for ((k, v) <- programs) {
val Program(prg) = v.asInstanceOf[Map[String, Object]];
prg.programId = k;
prgs += prg;
}
Some(Group(Array(prgs: _*)))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object Program {
def unapply(values: Map[String, Object]) = try {
val SomeFlags(flags) = values("some_flags").asInstanceOf[Map[String, Object]]
Some(Program("pid", values("date").asInstanceOf[String], values("name").asInstanceOf[String], flags))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object SomeFlags {
def unapply(values: Map[String, Object]) = try {
Some(SomeFlags(values("abc").asInstanceOf[Boolean], values("def").asInstanceOf[Boolean], values("ghi").asInstanceOf[Boolean], values("xyz").asInstanceOf[Boolean]))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
这里的关键部分是Group.unapply
内部,其中prg.programId
被手动设置为包含所有程序的映射的键。
最后是 Spark 代码。 DataframeReader.textFile 用于读取文件(每行应包含一个完整的 Json 文档)。结果是 Dataset[String]
并且任何其他生成每行包含一个完整 Json 文档的数据框的数据源也将起作用。
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
val ds: Dataset[String] = spark.read.textFile(<path to file>)
val ds2: Dataset[Groups] = ds.map(s => {
val mapper = new ObjectMapper() with ScalaObjectMapper //
mapper.registerModule(DefaultScalaModule)
val obj = mapper.readValue[Map[String, Object]](s)
val Groups(groups) = obj
groups
})
ds2
现在有架构:
root
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- programs: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- programId: string (nullable = true)
| | | | |-- date: string (nullable = true)
| | | | |-- name: string (nullable = true)
| | | | |-- someFlags: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = false)
| | | | | |-- def1: boolean (nullable = false)
| | | | | |-- ghi: boolean (nullable = false)
| | | | | |-- xyz: boolean (nullable = false)
需要改进的地方:
unapply
方法中更好的错误处理
- 将
map
函数替换为 mapPartitions
以提高性能
我有以下结构
根
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- programs: struct (nullable = true)
| | | |-- **{ program id }**: struct (nullable = true)
| | | | |-- Date: timestamp (nullable = true)
| | | | |-- Name: string (nullable = true)
| | | | |-- Some_Flags: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = true)
| | | | | |-- def: boolean (nullable = true)
| | | | | |-- ghi: boolean (nullable = true)
| | | | | |-- xyz: boolean (nullable = true)
“groups” : [
{
… some other fields …
“programs” : {
“123c12b123456c1d76a4f265f10f20a0” : {
“name” : “test_program_1”,
“some_flags” : {
“abc” : true,
“def” : true,
“ghi” : false,
“xyz” : true
},
“date” : ISODate(“2019–11–16T03:29:00.000+0000”)
}
}
]
val data = spark.read.json("path").map(customParser) How do I use custom parser to map to case class?
数据来自 mongo 数据库。 需要分布式解析以便我可以遍历每一行。
由于 json 文档具有可变键(program id
不是常量键,而是随每个条目而变化),Spark 无法推断模式。一种选择是手动处理文档:
案例类:
case class SomeFlags(abc: Boolean, def1: Boolean, ghi: Boolean, xyz: Boolean)
case class Program(var programId: String, date: String, name: String, someFlags: SomeFlags)
case class Group(programs: Array[Program])
case class Groups(groups: Array[Group])
用于从 json 字符串中提取数据字段的 companion objects:
object Groups {
def unapply(values: Map[String, Object]) = try {
val groups = values("groups").asInstanceOf[List[Map[String, Object]]]
val grps = new ListBuffer[Group]()
for (group <- groups) {
val Group(grp) = group
grps += grp
}
Some(Groups(Array(grps: _*)))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object Group {
def unapply(values: Map[String, Object]) = try {
val programs = values("programs").asInstanceOf[Map[String, Object]]
val prgs = new ListBuffer[Program]()
for ((k, v) <- programs) {
val Program(prg) = v.asInstanceOf[Map[String, Object]];
prg.programId = k;
prgs += prg;
}
Some(Group(Array(prgs: _*)))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object Program {
def unapply(values: Map[String, Object]) = try {
val SomeFlags(flags) = values("some_flags").asInstanceOf[Map[String, Object]]
Some(Program("pid", values("date").asInstanceOf[String], values("name").asInstanceOf[String], flags))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
object SomeFlags {
def unapply(values: Map[String, Object]) = try {
Some(SomeFlags(values("abc").asInstanceOf[Boolean], values("def").asInstanceOf[Boolean], values("ghi").asInstanceOf[Boolean], values("xyz").asInstanceOf[Boolean]))
} catch {
case NonFatal(ex) => {
println(ex)
None
}
}
}
这里的关键部分是Group.unapply
内部,其中prg.programId
被手动设置为包含所有程序的映射的键。
最后是 Spark 代码。 DataframeReader.textFile 用于读取文件(每行应包含一个完整的 Json 文档)。结果是 Dataset[String]
并且任何其他生成每行包含一个完整 Json 文档的数据框的数据源也将起作用。
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
val ds: Dataset[String] = spark.read.textFile(<path to file>)
val ds2: Dataset[Groups] = ds.map(s => {
val mapper = new ObjectMapper() with ScalaObjectMapper //
mapper.registerModule(DefaultScalaModule)
val obj = mapper.readValue[Map[String, Object]](s)
val Groups(groups) = obj
groups
})
ds2
现在有架构:
root
|-- groups: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- programs: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- programId: string (nullable = true)
| | | | |-- date: string (nullable = true)
| | | | |-- name: string (nullable = true)
| | | | |-- someFlags: struct (nullable = true)
| | | | | |-- abc: boolean (nullable = false)
| | | | | |-- def1: boolean (nullable = false)
| | | | | |-- ghi: boolean (nullable = false)
| | | | | |-- xyz: boolean (nullable = false)
需要改进的地方:
unapply
方法中更好的错误处理- 将
map
函数替换为mapPartitions
以提高性能