计算总和,多个前K值的计数火花

Calculating sum,count of multiple top K values spark

我有一个格式为

的输入数据框
+---------------------------------+
|name| values |score    |row_number|
+---------------------------------+
|A    |1000   |0        |1        |
|B    |947    |0        |2        |
|C    |923    |1        |3        |
|D    |900    |2        |4        |
|E    |850    |3        |5        |
|F    |800    |1        |6        |
+---------------------------------+

我需要在分数 > 0 和 row_number < K (i,e) 数据帧中的前 k 个值的分数 > 0 时获取所有值的总和。

我可以通过运行以下查询前 100 个值

来实现这一点
val top_100_data = df.select(
      count(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("count_100"),
      sum(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("sum_filtered_100"),
      sum(when(col("row_number") <=100, col(values))).alias("total_sum_100")
    )

但是,我需要获取前 100,200,300......2500 的数据。这意味着我需要 运行 这个查询 25 次,最后合并 25 个数据帧。

我是 spark 的新手,但仍在想办法解决很多问题。解决这个问题的最佳方法是什么?

谢谢!!

您可以创建一个 Array 的限制作为

val topFilters = Array(100, 200, 300) // you can add more

然后您可以遍历 topFilters 数组并创建您需要的 dataframe我建议你使用 join 而不是 union 因为 join 会给你单独的 columnsunions 会给你单独的 rows.您可以执行以下操作

鉴于您的 dataframe

+----+------+-----+----------+
|name|values|score|row_number|
+----+------+-----+----------+
|A   |1000  |0    |1         |
|B   |947   |0    |2         |
|C   |923   |1    |3         |
|D   |900   |2    |200       |
|E   |850   |3    |150       |
|F   |800   |1    |250       |
+----+------+-----+----------+

您可以使用上面定义的 topFilters 数组

import sqlContext.implicits._
import org.apache.spark.sql.functions._
var finalDF : DataFrame = Seq("1").toDF("rowNum")
for(k <- topFilters) {
  val top_100_data = df.select(lit("1").as("rowNum"), sum(when(col("score") > 0 && col("row_number") < k, col("values"))).alias(s"total_sum_$k"))
  finalDF = finalDF.join(top_100_data, Seq("rowNum"))
}
finalDF.show(false)

哪个应该给你最终的 dataframe 作为

+------+-------------+-------------+-------------+
|rowNum|total_sum_100|total_sum_200|total_sum_300|
+------+-------------+-------------+-------------+
|1     |923          |1773         |3473         |
+------+-------------+-------------+-------------+

您可以对现有的 25 个限制执行相同的操作。

如果你打算使用union,那么思路同上。

希望回答对你有帮助

已更新

如果您需要并集,那么您可以对上面定义的相同限制数组应用以下逻辑

var finalDF : DataFrame = Seq((0, 0, 0, 0)).toDF("limit", "count", "sum_filtered", "total_sum")
for(k <- topFilters) {
  val top_100_data = df.select(lit(k).as("limit"), count(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("count"),
    sum(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("sum_filtered"),
    sum(when(col("row_number") <=k, col("values"))).alias("total_sum"))
  finalDF = finalDF.union(top_100_data)
}
finalDF.filter(col("limit") =!= 0).show(false)

哪个应该给你

+-----+-----+------------+---------+
|limit|count|sum_filtered|total_sum|
+-----+-----+------------+---------+
|100  |1    |923         |2870     |
|200  |3    |2673        |4620     |
|300  |4    |3473        |5420     |
+-----+-----+------------+---------+