如何使用 Scala 在 Spark 中滑动 Window 排名?
How to do Sliding Window Rank in Spark using Scala?
我有一个数据集:
+-----+-------------------+---------------------+------------------+
|query|similar_queries |model_score |count |
+-----+-------------------+---------------------+------------------+
|shirt|funny shirt |0.0034038130658784866|189.0 |
|shirt|shirt womens |0.0019435265241921438|136.0 |
|shirt|watch |0.001097496453284101 |212.0 |
|shirt|necklace |6.694577024597908E-4 |151.0 |
|shirt|white shirt |0.0037413097560623485|217.0 |
|shirt|shoes |0.0022062579255572733|575.0 |
|shirt|crop top |9.065831060804897E-4 |173.0 |
|shirt|polo shirts for men|0.007706416273211698 |349.0 |
|shirt|shorts |0.002669621942466027 |200.0 |
|shirt|black shirt |0.03264296242546658 |114.0 |
+-----+-------------------+---------------------+------------------+
我是先根据"count"对数据集进行排名。
lazy val countWindowByFreq = Window.partitionBy(col(QUERY)).orderBy(col(COUNT).desc)
val ranked_data = data.withColumn("count_rank", row_number over countWindowByFreq)
+-----+-------------------+---------------------+------------------+----------+
|query|similar_queries |model_score |count |count_rank|
+-----+-------------------+---------------------+------------------+----------+
|shirt|shoes |0.0022062579255572733|575.0 |1 |
|shirt|polo shirts for men|0.007706416273211698 |349.0 |2 |
|shirt|white shirt |0.0037413097560623485|217.0 |3 |
|shirt|watch |0.001097496453284101 |212.0 |4 |
|shirt|shorts |0.002669621942466027 |200.0 |5 |
|shirt|funny shirt |0.0034038130658784866|189.0 |6 |
|shirt|crop top |9.065831060804897E-4 |173.0 |7 |
|shirt|necklace |6.694577024597908E-4 |151.0 |8 |
|shirt|shirt womens |0.0019435265241921438|136.0 |9 |
|shirt|black shirt |0.03264296242546658 |114.0 |10 |
+-----+-------------------+---------------------+------------------+----------+
我现在尝试在 row_number(4 行)上使用滚动 window 对内容进行排名,并根据 model_score 在 window 中排名。例如:
第一个window,row_number 1到4,新排名(新列)将是
1. polo shirts for men
2. white shirt
3. shoes
4. watch
在第window、row_number 5 到 8 中,新排名(新列)将是
5. funny shirt
6. shorts
7. shirt womens
8. crop top
在第window、row_number 9 后,新排名(新列)将是
9. black shirt
10. shirt womens
有人能告诉我如何用这个 spark 和 Scala 实现吗?有没有我可以使用的预定义函数?
我试过了:
惰性值 MODEL_RANK = Window.partitionBy(col(QUERY))
.orderBy(col(MODEL_SCORE).desc).rowsBetween( 0, 3)
但这给了我:
sql.AnalysisException: Window Frame ROWS BETWEEN CURRENT ROW AND 3 FOLLOWING must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW;
此外,尝试使用 .rowsBetween(-3, 0) 但这也给我错误:
org.apache.spark.sql.AnalysisException: Window Frame ROWS BETWEEN 3 PRECEDING AND CURRENT ROW must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW;
由于您已经 count_rank
进行了计算,下一步是找到一种方法将行分组为一组。可以这样操作:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val ranked_data_grouped = ranked_data
.withColumn("bucket", (($"count_rank" -1)/4).cast(IntegerType))
ranked_data_grouped 看起来像:
+-----+-------------------+---------------------+------------------+----------+-------+
|query|similar_queries |model_score |count |count_rank|bucket |
+-----+-------------------+---------------------+------------------+----------+-------+
|shirt|shoes |0.0022062579255572733|575.0 |1 |0 |
|shirt|polo shirts for men|0.007706416273211698 |349.0 |2 |0 |
|shirt|white shirt |0.0037413097560623485|217.0 |3 |0 |
|shirt|watch |0.001097496453284101 |212.0 |4 |0 |
|shirt|shorts |0.002669621942466027 |200.0 |5 |1 |
|shirt|funny shirt |0.0034038130658784866|189.0 |6 |1 |
|shirt|crop top |9.065831060804897E-4 |173.0 |7 |1 |
|shirt|necklace |6.694577024597908E-4 |151.0 |8 |1 |
|shirt|shirt womens |0.0019435265241921438|136.0 |9 |2 |
|shirt|black shirt |0.03264296242546658 |114.0 |10 |2 |
+-----+-------------------+---------------------+------------------+----------+-------+
现在,您所要做的就是按 bucket
分区并按 model_score
排序:
val output = ranked_data_grouped
.withColumn("finalRank", row_number().over(Window.partitionBy($"bucket").orderBy($"model_score".desc)))
我有一个数据集:
+-----+-------------------+---------------------+------------------+
|query|similar_queries |model_score |count |
+-----+-------------------+---------------------+------------------+
|shirt|funny shirt |0.0034038130658784866|189.0 |
|shirt|shirt womens |0.0019435265241921438|136.0 |
|shirt|watch |0.001097496453284101 |212.0 |
|shirt|necklace |6.694577024597908E-4 |151.0 |
|shirt|white shirt |0.0037413097560623485|217.0 |
|shirt|shoes |0.0022062579255572733|575.0 |
|shirt|crop top |9.065831060804897E-4 |173.0 |
|shirt|polo shirts for men|0.007706416273211698 |349.0 |
|shirt|shorts |0.002669621942466027 |200.0 |
|shirt|black shirt |0.03264296242546658 |114.0 |
+-----+-------------------+---------------------+------------------+
我是先根据"count"对数据集进行排名。
lazy val countWindowByFreq = Window.partitionBy(col(QUERY)).orderBy(col(COUNT).desc)
val ranked_data = data.withColumn("count_rank", row_number over countWindowByFreq)
+-----+-------------------+---------------------+------------------+----------+
|query|similar_queries |model_score |count |count_rank|
+-----+-------------------+---------------------+------------------+----------+
|shirt|shoes |0.0022062579255572733|575.0 |1 |
|shirt|polo shirts for men|0.007706416273211698 |349.0 |2 |
|shirt|white shirt |0.0037413097560623485|217.0 |3 |
|shirt|watch |0.001097496453284101 |212.0 |4 |
|shirt|shorts |0.002669621942466027 |200.0 |5 |
|shirt|funny shirt |0.0034038130658784866|189.0 |6 |
|shirt|crop top |9.065831060804897E-4 |173.0 |7 |
|shirt|necklace |6.694577024597908E-4 |151.0 |8 |
|shirt|shirt womens |0.0019435265241921438|136.0 |9 |
|shirt|black shirt |0.03264296242546658 |114.0 |10 |
+-----+-------------------+---------------------+------------------+----------+
我现在尝试在 row_number(4 行)上使用滚动 window 对内容进行排名,并根据 model_score 在 window 中排名。例如:
第一个window,row_number 1到4,新排名(新列)将是
1. polo shirts for men
2. white shirt
3. shoes
4. watch
在第window、row_number 5 到 8 中,新排名(新列)将是
5. funny shirt
6. shorts
7. shirt womens
8. crop top
在第window、row_number 9 后,新排名(新列)将是
9. black shirt
10. shirt womens
有人能告诉我如何用这个 spark 和 Scala 实现吗?有没有我可以使用的预定义函数?
我试过了:
惰性值 MODEL_RANK = Window.partitionBy(col(QUERY)) .orderBy(col(MODEL_SCORE).desc).rowsBetween( 0, 3)
但这给了我:
sql.AnalysisException: Window Frame ROWS BETWEEN CURRENT ROW AND 3 FOLLOWING must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW;
此外,尝试使用 .rowsBetween(-3, 0) 但这也给我错误:
org.apache.spark.sql.AnalysisException: Window Frame ROWS BETWEEN 3 PRECEDING AND CURRENT ROW must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW;
由于您已经 count_rank
进行了计算,下一步是找到一种方法将行分组为一组。可以这样操作:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val ranked_data_grouped = ranked_data
.withColumn("bucket", (($"count_rank" -1)/4).cast(IntegerType))
ranked_data_grouped 看起来像:
+-----+-------------------+---------------------+------------------+----------+-------+
|query|similar_queries |model_score |count |count_rank|bucket |
+-----+-------------------+---------------------+------------------+----------+-------+
|shirt|shoes |0.0022062579255572733|575.0 |1 |0 |
|shirt|polo shirts for men|0.007706416273211698 |349.0 |2 |0 |
|shirt|white shirt |0.0037413097560623485|217.0 |3 |0 |
|shirt|watch |0.001097496453284101 |212.0 |4 |0 |
|shirt|shorts |0.002669621942466027 |200.0 |5 |1 |
|shirt|funny shirt |0.0034038130658784866|189.0 |6 |1 |
|shirt|crop top |9.065831060804897E-4 |173.0 |7 |1 |
|shirt|necklace |6.694577024597908E-4 |151.0 |8 |1 |
|shirt|shirt womens |0.0019435265241921438|136.0 |9 |2 |
|shirt|black shirt |0.03264296242546658 |114.0 |10 |2 |
+-----+-------------------+---------------------+------------------+----------+-------+
现在,您所要做的就是按 bucket
分区并按 model_score
排序:
val output = ranked_data_grouped
.withColumn("finalRank", row_number().over(Window.partitionBy($"bucket").orderBy($"model_score".desc)))