基于列值 (measure_type) 的具有数据透视表和不同聚合的 Spark Dataframe - Scala

Spark Dataframe with pivot and different aggregation, based on the column value (measure_type) - Scala

我有一个这种类型的 spark 数据框:

scala> val data = Seq((1, "k1", "measureA", 2), (1, "k1", "measureA", 4), (1, "k1", "measureB", 5), (1, "k1", "measureB", 7), (1, "k1", "measureC", 7), (1, "k1", "measureC", 1), (2, "k1", "measureB", 8), (2, "k1", "measureC", 9), (2, "k2", "measureA", 5), (2, "k2", "measureC", 5), (2, "k2", "measureC", 8))
data: Seq[(Int, String, String, Int)] = List((1,k1,measureA,2), (1,k1,measureA,4), (1,k1,measureB,5), (1,k1,measureB,7), (1,k1,measureC,7), (1,k1,measureC,1), (2,k1,measureB,8), (2,k1,measureC,9), (2,k2,measureA,5), (2,k2,measureC,5), (2,k2,measureC,8))

scala> val rdd = spark.sparkContext.parallelize(data)
rdd: org.apache.spark.rdd.RDD[(Int, String, String, Int)] = ParallelCollectionRDD[22] at parallelize at <console>:27

scala> val df = rdd.toDF("ts","key","measure_type","value")
df: org.apache.spark.sql.DataFrame = [ts: int, key: string ... 2 more fields]

scala> df.show
+---+---+------------+-----+
| ts|key|measure_type|value|
+---+---+------------+-----+
|  1| k1|    measureA|    2|
|  1| k1|    measureA|    4|
|  1| k1|    measureB|    5|
|  1| k1|    measureB|    7|
|  1| k1|    measureC|    7|
|  1| k1|    measureC|    1|
|  2| k1|    measureB|    8|
|  2| k1|    measureC|    9|
|  2| k2|    measureA|    5|
|  2| k2|    measureC|    5|
|  2| k2|    measureC|    8|
+---+---+------------+-----+

我想以 measure_type 为中心并对值应用不同的聚合类型,具体取决于 measure_type:

然后,得到如下输出数据帧:

+---+---+--------+--------+--------+
| ts|key|measureA|measureB|measureC|
+---+---+--------+--------+--------+
|  1| k1|       6|       6|       7|
|  2| k1|    null|       8|       9|
|  2| k2|       5|    null|       8|
+---+---+--------+--------+--------+

非常感谢。

val ddf = df.groupBy("ts", "key").agg(
 sum(when(col("measure_type") === "measureA",col("value"))).as("measureA"),
 avg(when(col("measure_type") === "measureB",col("value"))).as("measureB"),
 max(when(col("measure_type") === "measureC",col("value"))).as("measureC"))

结果是

scala> ddf.show(false)
+---+---+--------+--------+--------+
|ts |key|measureA|measureB|measureC|
+---+---+--------+--------+--------+
|2  |k2 |5       |null    |8       |
|2  |k1 |null    |8.0     |9       |
|1  |k1 |6       |6.0     |7       |
+---+---+--------+--------+--------+

我认为使用传统的 pivot 函数很乏味,因为它只会限制您使用一个特定的聚合函数。

我要做的是映射一个预定义的聚合函数列表,我需要执行这些聚合函数并将它们应用到我的数据框上,为每个聚合函数提供 3 个额外的列,然后创建另一个具有值的列measure_type 如您所述,然后删除我在上一步中创建的 3 列

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import spark.implicits._

val df = Seq((1, "k1", "measureA", 2), (1, "k1", "measureA", 4), (1, "k1", "measureB", 5), (1, "k1", "measureB", 7), (1, "k1", "measureC", 7), (1, "k1", "measureC", 1), (2, "k1", "measureB", 8), (2, "k1", "measureC", 9), (2, "k2", "measureA", 5), (2, "k2", "measureC", 5), (2, "k2", "measureC", 8)).toDF("ts","key","measure_type","value")
val mapping: Map[String, Column => Column] = Map(
  "sum" -> sum, "avg" -> avg, "max" -> max)

val groupBy = Seq("ts","key","measure_type")
val aggregate = Seq("value")
val operations = Seq("sum", "avg", "max")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))

val df2 = df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*)

val df3 = df2.withColumn("new_column", 
           when($"measure_type" === "measureA", $"sum(value)")
           .when($"measure_type" === "measureB", $"avg(value)")
            .otherwise($"max(value)"))
          .drop("sum(value)")
          .drop("avg(value)")
          .drop("max(value)")

df3 是您需要的数据框。