如何解压 Spark 数据集中的多个密钥
How to unpack multiple keys in a Spark DataSet
我有以下 DataSet
,具有以下结构。
case class Person(age: Int, gender: String, salary: Double)
我想通过 gender
和 age
确定 平均 薪水,因此我按两个键对 DS
进行了分组。我遇到了两个主要问题,一个是两个键混合在一个键中,但我想将它们保留在两个不同的列中,另一个是 aggregated
列有一个愚蠢的长名称,我可以不知道如何重命名它(显然 as
和 alias
不起作用)所有这些都使用 DS API
.
val df = sc.parallelize(List(Person(100000.00, "male", 27),
Person(120000.00, "male", 27),
Person(95000, "male", 26),
Person(89000, "female", 31),
Person(250000, "female", 51),
Person(120000, "female", 51)
)).toDF.as[Person]
df.groupByKey(p => (p.gender, p.age)).agg(typed.avg(_.salary)).show()
+-----------+------------------------------------------------------------------------------------------------+
| key| TypedAverage(line2503618a50834b67a4b132d1b8d2310b12.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Person)|
+-----------+------------------------------------------------------------------------------------------------+
|[female,31]| 89000.0...
|[female,51]| 185000.0...
| [male,27]| 110000.0...
| [male,26]| 95000.0...
+-----------+------------------------------------------------------------------------------------------------+
别名是一个未输入的操作,因此您必须在之后重新输入。解压缩密钥的唯一方法是在之后通过 select 或其他方式进行:
df.groupByKey(p => (p.gender, p.age))
.agg(typed.avg[Person](_.salary).as("average_salary").as[Double])
.select($"key._1",$"key._2",$"average_salary").show
实现这两个目标的最简单方法是map()
再次从聚合结果到Person
实例:
.map{case ((gender, age), salary) => Person(gender, age, salary)}
如果在 class 的构造函数中稍微重新安排参数的顺序,结果将看起来最好:
case class Person(gender: String, age: Int, salary: Double)
+------+---+--------+
|gender|age| salary|
+------+---+--------+
|female| 31| 89000.0|
|female| 51|185000.0|
| male| 27|110000.0|
| male| 26| 95000.0|
+------+---+--------+
完整代码:
import session.implicits._
val df = session.sparkContext.parallelize(List(
Person("male", 27, 100000),
Person("male", 27, 120000),
Person("male", 26, 95000),
Person("female", 31, 89000),
Person("female", 51, 250000),
Person("female", 51, 120000)
)).toDS
import org.apache.spark.sql.expressions.scalalang.typed
df.groupByKey(p => (p.gender, p.age))
.agg(typed.avg(_.salary))
.map{case ((gender, age), salary) => Person(gender, age, salary)}
.show()
我有以下 DataSet
,具有以下结构。
case class Person(age: Int, gender: String, salary: Double)
我想通过 gender
和 age
确定 平均 薪水,因此我按两个键对 DS
进行了分组。我遇到了两个主要问题,一个是两个键混合在一个键中,但我想将它们保留在两个不同的列中,另一个是 aggregated
列有一个愚蠢的长名称,我可以不知道如何重命名它(显然 as
和 alias
不起作用)所有这些都使用 DS API
.
val df = sc.parallelize(List(Person(100000.00, "male", 27),
Person(120000.00, "male", 27),
Person(95000, "male", 26),
Person(89000, "female", 31),
Person(250000, "female", 51),
Person(120000, "female", 51)
)).toDF.as[Person]
df.groupByKey(p => (p.gender, p.age)).agg(typed.avg(_.salary)).show()
+-----------+------------------------------------------------------------------------------------------------+
| key| TypedAverage(line2503618a50834b67a4b132d1b8d2310b12.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Person)|
+-----------+------------------------------------------------------------------------------------------------+
|[female,31]| 89000.0...
|[female,51]| 185000.0...
| [male,27]| 110000.0...
| [male,26]| 95000.0...
+-----------+------------------------------------------------------------------------------------------------+
别名是一个未输入的操作,因此您必须在之后重新输入。解压缩密钥的唯一方法是在之后通过 select 或其他方式进行:
df.groupByKey(p => (p.gender, p.age))
.agg(typed.avg[Person](_.salary).as("average_salary").as[Double])
.select($"key._1",$"key._2",$"average_salary").show
实现这两个目标的最简单方法是map()
再次从聚合结果到Person
实例:
.map{case ((gender, age), salary) => Person(gender, age, salary)}
如果在 class 的构造函数中稍微重新安排参数的顺序,结果将看起来最好:
case class Person(gender: String, age: Int, salary: Double)
+------+---+--------+
|gender|age| salary|
+------+---+--------+
|female| 31| 89000.0|
|female| 51|185000.0|
| male| 27|110000.0|
| male| 26| 95000.0|
+------+---+--------+
完整代码:
import session.implicits._
val df = session.sparkContext.parallelize(List(
Person("male", 27, 100000),
Person("male", 27, 120000),
Person("male", 26, 95000),
Person("female", 31, 89000),
Person("female", 51, 250000),
Person("female", 51, 120000)
)).toDS
import org.apache.spark.sql.expressions.scalalang.typed
df.groupByKey(p => (p.gender, p.age))
.agg(typed.avg(_.salary))
.map{case ((gender, age), salary) => Person(gender, age, salary)}
.show()