计算每组的最大观察数
Calculate maximum number of observations per group
我用的是 Spark 1.6.2.
我需要找到每组的最大计数。
val myData = Seq(("aa1", "GROUP_A", "10"),("aa1","GROUP_A", "12"),("aa2","GROUP_A", "12"),("aa3", "GROUP_B", "14"),("aa3","GROUP_B", "11"),("aa3","GROUP_B","12" ),("aa2", "GROUP_B", "12"))
val df = sc.parallelize(myData).toDF("id","type","activity")
让我们先计算每组的观察次数:
df.groupBy("type","id").count.show
+-------+---+-----+
| type| id|count|
+-------+---+-----+
|GROUP_A|aa1| 2|
|GROUP_A|aa2| 1|
|GROUP_B|aa2| 1|
|GROUP_B|aa3| 3|
+-------+---+-----+
这是预期的结果:
+--------+----+-----+
|type | id|count|
+----+--------+-----+
| GROUP_A| aa1| 2|
| GROUP_B| aa3| 3|
+--------+----+-----+
我试过了,但没用:
df.groupBy("type","id").count.filter("count = 'max'").show
group by后可以使用max函数
val myData = Seq(("aa1", "GROUP_A", "10"),("aa1","GROUP_A", "12"),("aa2","GROUP_A", "12"),("aa3", "GROUP_B", "14"),("aa3","GROUP_B", "11"),("aa3","GROUP_B","12" ),("aa2", "GROUP_B", "12"))
val df = sc.parallelize(myData).toDF("id","type","activity")
//在groupby之后进行计数,然后对count字段进行别名,然后在cnt字段中找到最大值。
val newDF = df1.groupBy("type", "id").agg(count("*").alias("cnt"))
val df1 = newDF.groupBy("type").max("cnt").show
现在您可以加入这两个数据框来获取输出。
df1.join(newDF.as("newDF"), col("cnt") === col("max(cnt)")).select($"newDF.*").show
要获得 "row with maximum value of column X"(而不仅仅是那个最大值),您可以使用 "grouping" 这个小技巧将相关列一起放入包含排序列的 struct
作为第一列 - 然后计算该结构的 max
。由于 struct
的排序是 "dominated" 按其第一列的排序 - 我们将得到所需的结果:
df.groupBy("id","type").count() // get count per id and type
.groupBy("type") // now group by type only
.agg(max(struct("count", "id")) as "struct") // get maximum of (count, id) structs - since count is first, and id is unique - count will decide the ordering
.select($"type", $"struct.id" as "id", $"struct.count" as "count") // "unwrap" structs
.show()
// +-------+---+-----+
// | type| id|count|
// +-------+---+-----+
// |GROUP_A|aa1| 2|
// |GROUP_B|aa3| 3|
// +-------+---+-----+
您可以使用 Window
函数找到 max
并通过结合上面@Tzach 的回答删除 duplicates
val windowSpec = Window.partitionBy(col("type"))
import org.apache.spark.sql.functions._
df.groupBy("type","id").count()
.withColumn("count", max(struct("count", "id")).over(windowSpec))
.dropDuplicates("type")
.select($"type", $"count.id" as "id", $"count.count" as "count").show
谢谢
我用的是 Spark 1.6.2.
我需要找到每组的最大计数。
val myData = Seq(("aa1", "GROUP_A", "10"),("aa1","GROUP_A", "12"),("aa2","GROUP_A", "12"),("aa3", "GROUP_B", "14"),("aa3","GROUP_B", "11"),("aa3","GROUP_B","12" ),("aa2", "GROUP_B", "12"))
val df = sc.parallelize(myData).toDF("id","type","activity")
让我们先计算每组的观察次数:
df.groupBy("type","id").count.show
+-------+---+-----+
| type| id|count|
+-------+---+-----+
|GROUP_A|aa1| 2|
|GROUP_A|aa2| 1|
|GROUP_B|aa2| 1|
|GROUP_B|aa3| 3|
+-------+---+-----+
这是预期的结果:
+--------+----+-----+
|type | id|count|
+----+--------+-----+
| GROUP_A| aa1| 2|
| GROUP_B| aa3| 3|
+--------+----+-----+
我试过了,但没用:
df.groupBy("type","id").count.filter("count = 'max'").show
group by后可以使用max函数
val myData = Seq(("aa1", "GROUP_A", "10"),("aa1","GROUP_A", "12"),("aa2","GROUP_A", "12"),("aa3", "GROUP_B", "14"),("aa3","GROUP_B", "11"),("aa3","GROUP_B","12" ),("aa2", "GROUP_B", "12"))
val df = sc.parallelize(myData).toDF("id","type","activity")
//在groupby之后进行计数,然后对count字段进行别名,然后在cnt字段中找到最大值。
val newDF = df1.groupBy("type", "id").agg(count("*").alias("cnt"))
val df1 = newDF.groupBy("type").max("cnt").show
现在您可以加入这两个数据框来获取输出。
df1.join(newDF.as("newDF"), col("cnt") === col("max(cnt)")).select($"newDF.*").show
要获得 "row with maximum value of column X"(而不仅仅是那个最大值),您可以使用 "grouping" 这个小技巧将相关列一起放入包含排序列的 struct
作为第一列 - 然后计算该结构的 max
。由于 struct
的排序是 "dominated" 按其第一列的排序 - 我们将得到所需的结果:
df.groupBy("id","type").count() // get count per id and type
.groupBy("type") // now group by type only
.agg(max(struct("count", "id")) as "struct") // get maximum of (count, id) structs - since count is first, and id is unique - count will decide the ordering
.select($"type", $"struct.id" as "id", $"struct.count" as "count") // "unwrap" structs
.show()
// +-------+---+-----+
// | type| id|count|
// +-------+---+-----+
// |GROUP_A|aa1| 2|
// |GROUP_B|aa3| 3|
// +-------+---+-----+
您可以使用 Window
函数找到 max
并通过结合上面@Tzach 的回答删除 duplicates
val windowSpec = Window.partitionBy(col("type"))
import org.apache.spark.sql.functions._
df.groupBy("type","id").count()
.withColumn("count", max(struct("count", "id")).over(windowSpec))
.dropDuplicates("type")
.select($"type", $"count.id" as "id", $"count.count" as "count").show
谢谢