计算总和,多个前K值的计数火花
Calculating sum,count of multiple top K values spark
我有一个格式为
的输入数据框
+---------------------------------+
|name| values |score |row_number|
+---------------------------------+
|A |1000 |0 |1 |
|B |947 |0 |2 |
|C |923 |1 |3 |
|D |900 |2 |4 |
|E |850 |3 |5 |
|F |800 |1 |6 |
+---------------------------------+
我需要在分数 > 0 和 row_number < K (i,e) 数据帧中的前 k 个值的分数 > 0 时获取所有值的总和。
我可以通过运行以下查询前 100 个值
来实现这一点
val top_100_data = df.select(
count(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("count_100"),
sum(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("sum_filtered_100"),
sum(when(col("row_number") <=100, col(values))).alias("total_sum_100")
)
但是,我需要获取前 100,200,300......2500 的数据。这意味着我需要 运行 这个查询 25 次,最后合并 25 个数据帧。
我是 spark 的新手,但仍在想办法解决很多问题。解决这个问题的最佳方法是什么?
谢谢!!
您可以创建一个 Array
的限制作为
val topFilters = Array(100, 200, 300) // you can add more
然后您可以遍历 topFilters
数组并创建您需要的 dataframe
。 我建议你使用 join
而不是 union
因为 join
会给你单独的 columns
而 unions
会给你单独的 rows
.您可以执行以下操作
鉴于您的 dataframe
为
+----+------+-----+----------+
|name|values|score|row_number|
+----+------+-----+----------+
|A |1000 |0 |1 |
|B |947 |0 |2 |
|C |923 |1 |3 |
|D |900 |2 |200 |
|E |850 |3 |150 |
|F |800 |1 |250 |
+----+------+-----+----------+
您可以使用上面定义的 topFilters
数组
import sqlContext.implicits._
import org.apache.spark.sql.functions._
var finalDF : DataFrame = Seq("1").toDF("rowNum")
for(k <- topFilters) {
val top_100_data = df.select(lit("1").as("rowNum"), sum(when(col("score") > 0 && col("row_number") < k, col("values"))).alias(s"total_sum_$k"))
finalDF = finalDF.join(top_100_data, Seq("rowNum"))
}
finalDF.show(false)
哪个应该给你最终的 dataframe
作为
+------+-------------+-------------+-------------+
|rowNum|total_sum_100|total_sum_200|total_sum_300|
+------+-------------+-------------+-------------+
|1 |923 |1773 |3473 |
+------+-------------+-------------+-------------+
您可以对现有的 25 个限制执行相同的操作。
如果你打算使用union
,那么思路同上。
希望回答对你有帮助
已更新
如果您需要并集,那么您可以对上面定义的相同限制数组应用以下逻辑
var finalDF : DataFrame = Seq((0, 0, 0, 0)).toDF("limit", "count", "sum_filtered", "total_sum")
for(k <- topFilters) {
val top_100_data = df.select(lit(k).as("limit"), count(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("count"),
sum(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("sum_filtered"),
sum(when(col("row_number") <=k, col("values"))).alias("total_sum"))
finalDF = finalDF.union(top_100_data)
}
finalDF.filter(col("limit") =!= 0).show(false)
哪个应该给你
+-----+-----+------------+---------+
|limit|count|sum_filtered|total_sum|
+-----+-----+------------+---------+
|100 |1 |923 |2870 |
|200 |3 |2673 |4620 |
|300 |4 |3473 |5420 |
+-----+-----+------------+---------+
我有一个格式为
的输入数据框+---------------------------------+
|name| values |score |row_number|
+---------------------------------+
|A |1000 |0 |1 |
|B |947 |0 |2 |
|C |923 |1 |3 |
|D |900 |2 |4 |
|E |850 |3 |5 |
|F |800 |1 |6 |
+---------------------------------+
我需要在分数 > 0 和 row_number < K (i,e) 数据帧中的前 k 个值的分数 > 0 时获取所有值的总和。
我可以通过运行以下查询前 100 个值
来实现这一点val top_100_data = df.select(
count(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("count_100"),
sum(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("sum_filtered_100"),
sum(when(col("row_number") <=100, col(values))).alias("total_sum_100")
)
但是,我需要获取前 100,200,300......2500 的数据。这意味着我需要 运行 这个查询 25 次,最后合并 25 个数据帧。
我是 spark 的新手,但仍在想办法解决很多问题。解决这个问题的最佳方法是什么?
谢谢!!
您可以创建一个 Array
的限制作为
val topFilters = Array(100, 200, 300) // you can add more
然后您可以遍历 topFilters
数组并创建您需要的 dataframe
。 我建议你使用 join
而不是 union
因为 join
会给你单独的 columns
而 unions
会给你单独的 rows
.您可以执行以下操作
鉴于您的 dataframe
为
+----+------+-----+----------+
|name|values|score|row_number|
+----+------+-----+----------+
|A |1000 |0 |1 |
|B |947 |0 |2 |
|C |923 |1 |3 |
|D |900 |2 |200 |
|E |850 |3 |150 |
|F |800 |1 |250 |
+----+------+-----+----------+
您可以使用上面定义的 topFilters
数组
import sqlContext.implicits._
import org.apache.spark.sql.functions._
var finalDF : DataFrame = Seq("1").toDF("rowNum")
for(k <- topFilters) {
val top_100_data = df.select(lit("1").as("rowNum"), sum(when(col("score") > 0 && col("row_number") < k, col("values"))).alias(s"total_sum_$k"))
finalDF = finalDF.join(top_100_data, Seq("rowNum"))
}
finalDF.show(false)
哪个应该给你最终的 dataframe
作为
+------+-------------+-------------+-------------+
|rowNum|total_sum_100|total_sum_200|total_sum_300|
+------+-------------+-------------+-------------+
|1 |923 |1773 |3473 |
+------+-------------+-------------+-------------+
您可以对现有的 25 个限制执行相同的操作。
如果你打算使用union
,那么思路同上。
希望回答对你有帮助
已更新
如果您需要并集,那么您可以对上面定义的相同限制数组应用以下逻辑
var finalDF : DataFrame = Seq((0, 0, 0, 0)).toDF("limit", "count", "sum_filtered", "total_sum")
for(k <- topFilters) {
val top_100_data = df.select(lit(k).as("limit"), count(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("count"),
sum(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("sum_filtered"),
sum(when(col("row_number") <=k, col("values"))).alias("total_sum"))
finalDF = finalDF.union(top_100_data)
}
finalDF.filter(col("limit") =!= 0).show(false)
哪个应该给你
+-----+-----+------------+---------+
|limit|count|sum_filtered|total_sum|
+-----+-----+------------+---------+
|100 |1 |923 |2870 |
|200 |3 |2673 |4620 |
|300 |4 |3473 |5420 |
+-----+-----+------------+---------+