如何使用数据集进行分组

How to use dataset to groupby

我有一个使用rdd的请求:

val test = Seq(("New York", "Jack"),
    ("Los Angeles", "Tom"),
    ("Chicago", "David"),
    ("Houston", "John"),
    ("Detroit", "Michael"),
    ("Chicago", "Andrew"),
    ("Detroit", "Peter"),
    ("Detroit", "George")
  )
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println)

结果是:

(New York,List(Jack))

(Detroit,List(Michael, Peter, George))

(Los Angeles,List(Tom))

(Houston,List(John))

(Chicago,List(David, Andrew))

spark2.0如何使用数据集?

有办法使用自定义函数,但是感觉好复杂,有没有简单点的方法?

首先我会把你的 RDD 变成一个数据集:

val spark: org.apache.spark.sql.SparkSession = ???
import spark.implicits._

val testDs = test.toDS()

在这里你可以得到你的col名称:)明智地使用它!

testDs.schema.fields.foreach(x => println(x))

最后只需要用到一个groupBy:

testDs.groupBy("City?", "Name?")

RDD-s 并不是我认为的 2.0 版本方式。 如果您有任何问题,请直接提问。

我建议您先创建一个 case class as

case class Monkey(city: String, firstName: String)

这个case class应该定义在主class之外。然后你可以只使用toDS函数并使用groupByaggregation函数调用collect_list如下

import sqlContext.implicits._
import org.apache.spark.sql.functions._

val test = Seq(("New York", "Jack"),
  ("Los Angeles", "Tom"),
  ("Chicago", "David"),
  ("Houston", "John"),
  ("Detroit", "Michael"),
  ("Chicago", "Andrew"),
  ("Detroit", "Peter"),
  ("Detroit", "George")
)
sc.parallelize(test)
  .map(row => Monkey(row._1, row._2))
  .toDS()
  .groupBy("city")
  .agg(collect_list("firstName") as "list")
  .show(false)

您将得到

的输出
+-----------+------------------------+
|city       |list                    |
+-----------+------------------------+
|Los Angeles|[Tom]                   |
|Detroit    |[Michael, Peter, George]|
|Chicago    |[David, Andrew]         |
|Houston    |[John]                  |
|New York   |[Jack]                  |
+-----------+------------------------+

您始终可以通过调用 .rdd 函数

转换回 RDD

要创建数据集,首先要在 class 之外定义一个案例 class 作为

case class Employee(city: String, name: String)

然后您可以将列表转换为数据集

  val spark =
    SparkSession.builder().master("local").appName("test").getOrCreate()
    import spark.implicits._
    val test = Seq(("New York", "Jack"),
    ("Los Angeles", "Tom"),
    ("Chicago", "David"),
    ("Houston", "John"),
    ("Detroit", "Michael"),
    ("Chicago", "Andrew"),
    ("Detroit", "Peter"),
    ("Detroit", "George")
    ).toDF("city", "name")
    val data = test.as[Employee]

    import spark.implicits._
    val test = Seq(("New York", "Jack"),
      ("Los Angeles", "Tom"),
      ("Chicago", "David"),
      ("Houston", "John"),
      ("Detroit", "Michael"),
      ("Chicago", "Andrew"),
      ("Detroit", "Peter"),
      ("Detroit", "George")
    )

    val data = test.map(r => Employee(r._1, r._2)).toDS()

现在您可以 groupby 并执行任何聚合

data.groupBy("city").count().show

data.groupBy("city").agg(collect_list("name")).show

希望对您有所帮助!