如何使用数据集进行分组
How to use dataset to groupby
我有一个使用rdd的请求:
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println)
结果是:
(New York,List(Jack))
(Detroit,List(Michael, Peter, George))
(Los Angeles,List(Tom))
(Houston,List(John))
(Chicago,List(David, Andrew))
spark2.0如何使用数据集?
有办法使用自定义函数,但是感觉好复杂,有没有简单点的方法?
首先我会把你的 RDD 变成一个数据集:
val spark: org.apache.spark.sql.SparkSession = ???
import spark.implicits._
val testDs = test.toDS()
在这里你可以得到你的col名称:)明智地使用它!
testDs.schema.fields.foreach(x => println(x))
最后只需要用到一个groupBy:
testDs.groupBy("City?", "Name?")
RDD-s 并不是我认为的 2.0 版本方式。
如果您有任何问题,请直接提问。
我建议您先创建一个 case class
as
case class Monkey(city: String, firstName: String)
这个case class
应该定义在主class之外。然后你可以只使用toDS
函数并使用groupBy
和aggregation
函数调用collect_list
如下
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
sc.parallelize(test)
.map(row => Monkey(row._1, row._2))
.toDS()
.groupBy("city")
.agg(collect_list("firstName") as "list")
.show(false)
您将得到
的输出
+-----------+------------------------+
|city |list |
+-----------+------------------------+
|Los Angeles|[Tom] |
|Detroit |[Michael, Peter, George]|
|Chicago |[David, Andrew] |
|Houston |[John] |
|New York |[Jack] |
+-----------+------------------------+
您始终可以通过调用 .rdd
函数
转换回 RDD
要创建数据集,首先要在 class 之外定义一个案例 class 作为
case class Employee(city: String, name: String)
然后您可以将列表转换为数据集
val spark =
SparkSession.builder().master("local").appName("test").getOrCreate()
import spark.implicits._
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
).toDF("city", "name")
val data = test.as[Employee]
或
import spark.implicits._
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
val data = test.map(r => Employee(r._1, r._2)).toDS()
现在您可以 groupby
并执行任何聚合
data.groupBy("city").count().show
data.groupBy("city").agg(collect_list("name")).show
希望对您有所帮助!
我有一个使用rdd的请求:
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println)
结果是:
(New York,List(Jack))
(Detroit,List(Michael, Peter, George))
(Los Angeles,List(Tom))
(Houston,List(John))
(Chicago,List(David, Andrew))
spark2.0如何使用数据集?
有办法使用自定义函数,但是感觉好复杂,有没有简单点的方法?
首先我会把你的 RDD 变成一个数据集:
val spark: org.apache.spark.sql.SparkSession = ???
import spark.implicits._
val testDs = test.toDS()
在这里你可以得到你的col名称:)明智地使用它!
testDs.schema.fields.foreach(x => println(x))
最后只需要用到一个groupBy:
testDs.groupBy("City?", "Name?")
RDD-s 并不是我认为的 2.0 版本方式。 如果您有任何问题,请直接提问。
我建议您先创建一个 case class
as
case class Monkey(city: String, firstName: String)
这个case class
应该定义在主class之外。然后你可以只使用toDS
函数并使用groupBy
和aggregation
函数调用collect_list
如下
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
sc.parallelize(test)
.map(row => Monkey(row._1, row._2))
.toDS()
.groupBy("city")
.agg(collect_list("firstName") as "list")
.show(false)
您将得到
的输出+-----------+------------------------+
|city |list |
+-----------+------------------------+
|Los Angeles|[Tom] |
|Detroit |[Michael, Peter, George]|
|Chicago |[David, Andrew] |
|Houston |[John] |
|New York |[Jack] |
+-----------+------------------------+
您始终可以通过调用 .rdd
函数
RDD
要创建数据集,首先要在 class 之外定义一个案例 class 作为
case class Employee(city: String, name: String)
然后您可以将列表转换为数据集
val spark =
SparkSession.builder().master("local").appName("test").getOrCreate()
import spark.implicits._
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
).toDF("city", "name")
val data = test.as[Employee]
或
import spark.implicits._
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
val data = test.map(r => Employee(r._1, r._2)).toDS()
现在您可以 groupby
并执行任何聚合
data.groupBy("city").count().show
data.groupBy("city").agg(collect_list("name")).show
希望对您有所帮助!