如何在 Apache Spark 中爆炸 get_json_object
How to explode get_json_object in Apache Spark
我的数据框的一列中有以下字符串:
row1:[{"key":"foo"},{"key":"bar"},{"key":"baz"}]
row2:[{"key":"foo"},{"key":"bar"}]
row3:null
etc
我发现 Spark 有 "get_json_object" 功能。因此,如果我想使用 xpath 提取数据,我会使用:
get_json_object($"json", s"$[0].key")
会 returns:
"foo"
"foo"
null
但我需要相当于 "explosion" 的 Spark 函数。
我发现我可以在我的 xpath 中使用“*”符号。
get_json_object($"json", s"$[*].key")
它没有按预期执行,它会创建一个字符串,如:
[foo,bar,baz]
[foo,baz]
我在另一个 Whosebug 线程中找到了解决方案,
val jsonElements = (0 until 3).map(i => get_json_object($"json", s"$$[$i].key"))
val jsonElements = .map(i => get_json_object($"json", s"$$[$i].key"))
df.select($"id",explode(array(jsonElements: _*).alias("foo")))
这部分解决了我的问题,因为这个解决方案假定我知道我的阵列的最大深度。 Spark 的函数 "from_json" 需要模式,我有非常复杂的 JSON 类型,需要花费 "infinity" 的时间来创建模式。
免责声明
我不会使用任何正则 expression/substring/etc 来解析 JSON。使用解析器的全部建议是。
只要坚持scala基础就可以很简单的解决。尝试案例 类 以及解决问题的选项。
您可以使用任何标准 json 解析器。我用 liftweb.
import net.liftweb.json.{DefaultFormats, parseOpt}
case class jsonElement(key: String, value: Optional[String])
//assuming the value key always exists and value may or may not exist,
//so making that as optional / ignore the fields if you don't really care at all
val jsonKeys = inputRdd.map(eachRow =>
implicit val formats = DefaultFormats // hate this but deal with scala
val parsedObject = parseOpt(eachRow).flatMap(_.extractOpt[List[jsonElement]])
parsedObject match{
case Some(parsedItem) => parsedItem.map(json => json.key)
case None => List()
})
这给出了 list(key) 的 Rdd。如果要删除空列表,请使用 filter(list => !list.isEmpty) 。你从那里知道的。
此解决方案回答了您的问题,您可以使用 Spark 推断模式一次,然后再使用该模式。
我的数据框的一列中有以下字符串:
row1:[{"key":"foo"},{"key":"bar"},{"key":"baz"}]
row2:[{"key":"foo"},{"key":"bar"}]
row3:null
etc
我发现 Spark 有 "get_json_object" 功能。因此,如果我想使用 xpath 提取数据,我会使用:
get_json_object($"json", s"$[0].key")
会 returns:
"foo"
"foo"
null
但我需要相当于 "explosion" 的 Spark 函数。
我发现我可以在我的 xpath 中使用“*”符号。
get_json_object($"json", s"$[*].key")
它没有按预期执行,它会创建一个字符串,如:
[foo,bar,baz]
[foo,baz]
我在另一个 Whosebug 线程中找到了解决方案,
val jsonElements = (0 until 3).map(i => get_json_object($"json", s"$$[$i].key"))
val jsonElements = .map(i => get_json_object($"json", s"$$[$i].key"))
df.select($"id",explode(array(jsonElements: _*).alias("foo")))
这部分解决了我的问题,因为这个解决方案假定我知道我的阵列的最大深度。 Spark 的函数 "from_json" 需要模式,我有非常复杂的 JSON 类型,需要花费 "infinity" 的时间来创建模式。
免责声明
我不会使用任何正则 expression/substring/etc 来解析 JSON。使用解析器的全部建议是。
只要坚持scala基础就可以很简单的解决。尝试案例 类 以及解决问题的选项。
您可以使用任何标准 json 解析器。我用 liftweb.
import net.liftweb.json.{DefaultFormats, parseOpt}
case class jsonElement(key: String, value: Optional[String])
//assuming the value key always exists and value may or may not exist,
//so making that as optional / ignore the fields if you don't really care at all
val jsonKeys = inputRdd.map(eachRow =>
implicit val formats = DefaultFormats // hate this but deal with scala
val parsedObject = parseOpt(eachRow).flatMap(_.extractOpt[List[jsonElement]])
parsedObject match{
case Some(parsedItem) => parsedItem.map(json => json.key)
case None => List()
})
这给出了 list(key) 的 Rdd。如果要删除空列表,请使用 filter(list => !list.isEmpty) 。你从那里知道的。
此解决方案回答了您的问题,您可以使用 Spark 推断模式一次,然后再使用该模式。