使用 Scala 将数据集转换为 Json 数组 Spark
Converting DataSet to Json Array Spark using Scala
我是 spark 的新手,无法找出以下问题的解决方案。
我有一个 JSON 文件要解析,然后创建几个指标并将数据写回 JSON 格式。
下面是我正在使用的代码
import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions._
object quick2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.appName("quick1")
.master("local[*]")
.getOrCreate()
val rawData = spark.read.json("/home/umesh/Documents/Demo2/src/main/resources/sampleQuick.json")
val mat1 = rawData.select(rawData("mal_name"),rawData("cust_id")).distinct().orderBy("cust_id").toJSON.cache()
val mat2 = rawData.select(rawData("file_md5"),rawData("mal_name")).distinct().orderBy(asc("file_md5")).toJSON.cache()
val write1 = mat1.coalesce(1).toJavaRDD.saveAsTextFile("/home/umesh/Documents/Demo2/src/test/mat1/")
val write = mat2.coalesce(1).toJavaRDD.saveAsTextFile("/home/umesh/Documents/Demo2/src/test/mat2/")
}
}
现在上面的代码正在编写正确的 json 格式。
但是,矩阵也可以包含重复结果
示例:
md5 mal_name
1 a
1 b
2 c
3 d
3 e
所以上面的代码每个对象都写在一行中
像这样
{"file_md5":"1","mal_name":"a"}
{"file_md5":"1","mal_name":"b"}
{"file_md5":"2","mal_name":"c"}
{"file_md5":"3","mal_name":"d"}
等等。
但我想合并常用键的数据:
所以输出应该是
{"file_md5":"1","mal_name":["a","b"]}
有人可以建议我在这里做什么吗?或者如果有任何其他更好的方法来解决这个问题。
谢谢!
- You can use
collect_list
or collect_set
as per your need on mal_name
column
- You can directly save DataFrame/DataSet directly as JSON file
import org.apache.spark.sql.functions.{alias, collect_list}
import spark.implicits._
rawData.groupBy($"file_md5")
.agg(collect_set($"mal_name").alias("mal_name"))
.write
.format("json")
.save("json/file/location/to/save")
正如@mrsrinivas 所写,我按照以下内容更改了我的代码
val mat2 = rawData.select(rawData("file_md5"),rawData("mal_name")).distinct().orderBy(asc("file_md5")).cache()
val labeledDf = mat2.toDF("file_md5","mal_name")
labeledDf.groupBy($"file_md5").agg(collect_list($"mal_name")).coalesce(1).write.format("json").save("/home/umesh/Documents/Demo2/src/test/run8/")
如果有更多建议,请保持此问题开放。
我是 spark 的新手,无法找出以下问题的解决方案。
我有一个 JSON 文件要解析,然后创建几个指标并将数据写回 JSON 格式。
下面是我正在使用的代码
import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions._
object quick2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.appName("quick1")
.master("local[*]")
.getOrCreate()
val rawData = spark.read.json("/home/umesh/Documents/Demo2/src/main/resources/sampleQuick.json")
val mat1 = rawData.select(rawData("mal_name"),rawData("cust_id")).distinct().orderBy("cust_id").toJSON.cache()
val mat2 = rawData.select(rawData("file_md5"),rawData("mal_name")).distinct().orderBy(asc("file_md5")).toJSON.cache()
val write1 = mat1.coalesce(1).toJavaRDD.saveAsTextFile("/home/umesh/Documents/Demo2/src/test/mat1/")
val write = mat2.coalesce(1).toJavaRDD.saveAsTextFile("/home/umesh/Documents/Demo2/src/test/mat2/")
}
}
现在上面的代码正在编写正确的 json 格式。 但是,矩阵也可以包含重复结果 示例:
md5 mal_name
1 a
1 b
2 c
3 d
3 e
所以上面的代码每个对象都写在一行中
像这样
{"file_md5":"1","mal_name":"a"}
{"file_md5":"1","mal_name":"b"}
{"file_md5":"2","mal_name":"c"}
{"file_md5":"3","mal_name":"d"}
等等。
但我想合并常用键的数据:
所以输出应该是
{"file_md5":"1","mal_name":["a","b"]}
有人可以建议我在这里做什么吗?或者如果有任何其他更好的方法来解决这个问题。
谢谢!
- You can use
collect_list
orcollect_set
as per your need onmal_name
column- You can directly save DataFrame/DataSet directly as JSON file
import org.apache.spark.sql.functions.{alias, collect_list}
import spark.implicits._
rawData.groupBy($"file_md5")
.agg(collect_set($"mal_name").alias("mal_name"))
.write
.format("json")
.save("json/file/location/to/save")
正如@mrsrinivas 所写,我按照以下内容更改了我的代码
val mat2 = rawData.select(rawData("file_md5"),rawData("mal_name")).distinct().orderBy(asc("file_md5")).cache()
val labeledDf = mat2.toDF("file_md5","mal_name")
labeledDf.groupBy($"file_md5").agg(collect_list($"mal_name")).coalesce(1).write.format("json").save("/home/umesh/Documents/Demo2/src/test/run8/")
如果有更多建议,请保持此问题开放。