将 spark 数据框写为嵌套 JSON 文档
Writing out spark dataframe as nested JSON doc
我有一个 spark 数据框:
A B val_of_B val1 val2 val3 val4
"c1" "MCC" "cd1" 1 2 1.1 1.05
"c1" "MCC" "cd2" 2 3 1.1 1.05
"c1" "MCC" "cd3" 3 4 1.1 1.05
val1和val2是通过A,B,val_of_B的group by得到的,其中val3,val4只是A级的信息(比如distinct of A,val3只是"c1", 1.1)
我想将其写成嵌套 JSON,它应该如下所示:
对于每个 A,JSON 格式应该类似于
{"val3": 1.1, "val4": 1.05, "MCC":[["cd1",1,2], ["cd2",2,3], ["cd3",3,4]]}
是否可以使用 spark api 下的现有工具来完成此操作?如果没有,你能提供指导吗?
您应该 groupBy
在 列 A 和 aggregate
必要的列上使用 first
和 collect_list
和 array
内置函数
import org.apache.spark.sql.functions._
def zipping = udf((arr1: Seq[String], arr2: Seq[Seq[String]])=> arr1.indices.map(index => Array(arr1(index))++arr2(index)))
val jsonDF = df.groupBy("A")
.agg(first(col("val3")).as("val3"), first(col("val4")).as("val4"), first(col("B")).as("B"), collect_list("val_of_B").as("val_of_B"), collect_list(array("val1", "val2")).as("list"))
.select(col("val3"), col("val4"), col("B"), zipping(col("val_of_B"), col("list")).as("list"))
.toJSON
哪个应该给你
+-----------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------+
|{"val3":"1.1","val4":"1.05","B":"MCC","list":[["cd1","1","2"],["cd2","2","3"],["cd3","3","4"]]}|
+-----------------------------------------------------------------------------------------------+
下一步是使用 udf
函数将 list
名称交换为 B
的值,如
def exchangeName = udf((json: String)=> {
val splitted = json.split(",")
val name = splitted(2).split(":")(1).trim
val value = splitted(3).split(":")(1).trim
splitted(0).trim+","+splitted(1).trim+","+name+":"+value+","+(4 until splitted.size).map(splitted(_)).mkString(",")
})
jsonDF.select(exchangeName(col("value")).as("json"))
.show(false)
这应该会给你想要的输出
+------------------------------------------------------------------------------------+
|json |
+------------------------------------------------------------------------------------+
|{"val3":"1.1","val4":"1.05","MCC":[["cd1","1","2"],["cd2","2","3"],["cd3","3","4"]]}|
+------------------------------------------------------------------------------------+
我有一个 spark 数据框:
A B val_of_B val1 val2 val3 val4
"c1" "MCC" "cd1" 1 2 1.1 1.05
"c1" "MCC" "cd2" 2 3 1.1 1.05
"c1" "MCC" "cd3" 3 4 1.1 1.05
val1和val2是通过A,B,val_of_B的group by得到的,其中val3,val4只是A级的信息(比如distinct of A,val3只是"c1", 1.1)
我想将其写成嵌套 JSON,它应该如下所示:
对于每个 A,JSON 格式应该类似于
{"val3": 1.1, "val4": 1.05, "MCC":[["cd1",1,2], ["cd2",2,3], ["cd3",3,4]]}
是否可以使用 spark api 下的现有工具来完成此操作?如果没有,你能提供指导吗?
您应该 groupBy
在 列 A 和 aggregate
必要的列上使用 first
和 collect_list
和 array
内置函数
import org.apache.spark.sql.functions._
def zipping = udf((arr1: Seq[String], arr2: Seq[Seq[String]])=> arr1.indices.map(index => Array(arr1(index))++arr2(index)))
val jsonDF = df.groupBy("A")
.agg(first(col("val3")).as("val3"), first(col("val4")).as("val4"), first(col("B")).as("B"), collect_list("val_of_B").as("val_of_B"), collect_list(array("val1", "val2")).as("list"))
.select(col("val3"), col("val4"), col("B"), zipping(col("val_of_B"), col("list")).as("list"))
.toJSON
哪个应该给你
+-----------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------+
|{"val3":"1.1","val4":"1.05","B":"MCC","list":[["cd1","1","2"],["cd2","2","3"],["cd3","3","4"]]}|
+-----------------------------------------------------------------------------------------------+
下一步是使用 udf
函数将 list
名称交换为 B
的值,如
def exchangeName = udf((json: String)=> {
val splitted = json.split(",")
val name = splitted(2).split(":")(1).trim
val value = splitted(3).split(":")(1).trim
splitted(0).trim+","+splitted(1).trim+","+name+":"+value+","+(4 until splitted.size).map(splitted(_)).mkString(",")
})
jsonDF.select(exchangeName(col("value")).as("json"))
.show(false)
这应该会给你想要的输出
+------------------------------------------------------------------------------------+
|json |
+------------------------------------------------------------------------------------+
|{"val3":"1.1","val4":"1.05","MCC":[["cd1","1","2"],["cd2","2","3"],["cd3","3","4"]]}|
+------------------------------------------------------------------------------------+