使用 window 函数方法从列中激发 collect_set

Spark collect_set from a column using window function approach

我有一个包含工资的示例数据集。我想将该薪水分配到 3 个桶中,然后在每个桶中找到较低的薪水,然后将其转换为一个数组并将其附加到原始集合中。我正在尝试使用 window 函数来做到这一点。而且它似乎以渐进的方式进行。

这是我写的代码

val spark = sparkSession
import spark.implicits._
    
val simpleData = Seq(("James", "Sales", 3000),
  ("Michael", "Sales", 3100),
  ("Robert", "Sales", 3200),
  ("Maria", "Finance", 3300),
  ("James", "Sales", 3400),
  ("Scott", "Finance", 3500),
  ("Jen", "Finance", 3600),
  ("Jeff", "Marketing", 3700),
  ("Kumar", "Marketing", 3800),
  ("Saif", "Sales", 3900)
)
val df = simpleData.toDF("employee_name", "department", "salary")
val windowSpec = Window.orderBy("salary")
val ntileFrame = df.withColumn("ntile", ntile(3).over(windowSpec))
val lowWindowSpec = Window.partitionBy("ntile")
val ntileMinDf = ntileFrame.withColumn("lower_bound", min("salary").over(lowWindowSpec))
var rangeDf = ntileMinDf.withColumn("range", collect_set("lower_bound").over(windowSpec))
rangeDf.show()

我得到这样的数据集

+-------------+----------+------+-----+-----------+------------------+
|employee_name|department|salary|ntile|lower_bound|             range|
+-------------+----------+------+-----+-----------+------------------+
|        James|     Sales|  3000|    1|       3000|            [3000]|
|      Michael|     Sales|  3100|    1|       3000|            [3000]|
|       Robert|     Sales|  3200|    1|       3000|            [3000]|
|        Maria|   Finance|  3300|    1|       3000|            [3000]|
|        James|     Sales|  3400|    2|       3400|      [3000, 3400]|
|        Scott|   Finance|  3500|    2|       3400|      [3000, 3400]|
|          Jen|   Finance|  3600|    2|       3400|      [3000, 3400]|
|         Jeff| Marketing|  3700|    3|       3700|[3000, 3700, 3400]|
|        Kumar| Marketing|  3800|    3|       3700|[3000, 3700, 3400]|
|         Saif|     Sales|  3900|    3|       3700|[3000, 3700, 3400]|
+-------------+----------+------+-----+-----------+------------------+

我希望数据集看起来像这样

+-------------+----------+------+-----+-----------+------------------+
|employee_name|department|salary|ntile|lower_bound|             range|
+-------------+----------+------+-----+-----------+------------------+
|        James|     Sales|  3000|    1|       3000|[3000, 3700, 3400]|
|      Michael|     Sales|  3100|    1|       3000|[3000, 3700, 3400]|
|       Robert|     Sales|  3200|    1|       3000|[3000, 3700, 3400]|
|        Maria|   Finance|  3300|    1|       3000|[3000, 3700, 3400]|
|        James|     Sales|  3400|    2|       3400|[3000, 3700, 3400]|
|        Scott|   Finance|  3500|    2|       3400|[3000, 3700, 3400]|
|          Jen|   Finance|  3600|    2|       3400|[3000, 3700, 3400]|
|         Jeff| Marketing|  3700|    3|       3700|[3000, 3700, 3400]|
|        Kumar| Marketing|  3800|    3|       3700|[3000, 3700, 3400]|
|         Saif|     Sales|  3900|    3|       3700|[3000, 3700, 3400]|
+-------------+----------+------+-----+-----------+------------------+

为了确保您的 windows 考虑到所有行而不仅仅是当前行之前的行,您可以使用 rowsBetween 方法并将 Window.unboundedPrecedingWindow.unboundedFollowing 作为参数.你的最后一行因此变成:

var rangeDf = ntileMinDf.withColumn(
  "range",
  collect_set("lower_bound")
     .over(Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
)

你得到以下 rangeDf 数据框:

+-------------+----------+------+-----+-----------+------------------+
|employee_name|department|salary|ntile|lower_bound|             range|
+-------------+----------+------+-----+-----------+------------------+
|        James|     Sales|  3000|    1|       3000|[3000, 3700, 3400]|
|      Michael|     Sales|  3100|    1|       3000|[3000, 3700, 3400]|
|       Robert|     Sales|  3200|    1|       3000|[3000, 3700, 3400]|
|        Maria|   Finance|  3300|    1|       3000|[3000, 3700, 3400]|
|        James|     Sales|  3400|    2|       3400|[3000, 3700, 3400]|
|        Scott|   Finance|  3500|    2|       3400|[3000, 3700, 3400]|
|          Jen|   Finance|  3600|    2|       3400|[3000, 3700, 3400]|
|         Jeff| Marketing|  3700|    3|       3700|[3000, 3700, 3400]|
|        Kumar| Marketing|  3800|    3|       3700|[3000, 3700, 3400]|
|         Saif|     Sales|  3900|    3|       3700|[3000, 3700, 3400]|
+-------------+----------+------+-----+-----------+------------------+