根据另一列的降序对列上的火花数据集进行分组

group a spark dataset on a column based on descending order of another column

我有一个数据集,我尝试按扩展列降序排列,得到的结果如下

+---------+----------+----------------+
| FACTORY | CUSTOMER |     EXPEND     |
+---------+----------+----------------+
| ABC     | JOHN     | 147,883,593.00 |
| ABC     | DAVE     | 91,679,200.00  |
| ABC     | PET      | 61,424,237.00  |
| ABC     | DIN      | 18,613,473.00  |
| ABC     | INU      | 13,593,258.50  |
| DEF     | JOHN     | 8,438,527.00   |
| DEF     | DAVE     | 6,804,375.50   |
| DEF     | PET      | 2,569,754.16   |
| DEF     | DIN      | 2,540,791.00   |
| DEF     | INU      | 995,163.00     |
| DEF     | PET      | 173,020.00     |
+---------+----------+----------------+

但我想要的结果如下

+---------+----------+----------------+
| FACTORY | CUSTOMER |     EXPEND     |
+---------+----------+----------------+
| ABC     | JOHN     | 147,883,593.00 |
| DEF     | JOHN     | 8,438,527.00   |
| ABC     | DAVE     | 91,679,200.00  |
| DEF     | DAVE     | 6,804,375.50   |
| ABC     | PET      | 61,424,237.00  |
| DEF     | PET      | 2,569,754.16   |
| DEF     | PET      | 173,020.00     |
| ABC     | DIN      | 18,613,473.00  |
| DEF     | DIN      | 2,540,791.00   |
| ABC     | INU      | 13,593,258.50  |
| DEF     | INU      | 995,163.00     |
+---------+----------+----------------+

客户按照消费的降序进行分组。

我怎样才能做到这一点。 我有一个示例,我的数据集可以变得更复杂:(

可以在新数据框中提取正确顺序的用户,然后与原始数据框合并:

val original = Seq(
  ("ABC", "JOHN", 147883593.00),
  ("ABC", "DAVE", 91679200.00),
  ("ABC", "PET", 61424237.00),
  ("ABC", "DIN", 18613473.00),
  ("ABC", "INU", 13593258.50),
  ("DEF", "JOHN", 8438527.00),
  ("DEF", "DAVE", 6804375.50),
  ("DEF", "PET", 2569754.16),
  ("DEF", "DIN", 2540791.00),
  ("DEF", "INU", 995163.00),
  ("DEF", "PET", 173020.00)
).toDF(
  "FACTORY", "CUSTOMER", "EXPEND"
)

val customersInProperOrder = original
  .groupBy("CUSTOMER")
  .agg(max("EXPEND").alias("EXPEND"))
  .orderBy(desc("EXPEND"))
  .drop("EXPEND")
  .withColumn("ORDER", monotonically_increasing_id())


val result = original.alias("o")
  .join(customersInProperOrder.alias("c"), $"o.CUSTOMER" === $"c.CUSTOMER")
  .orderBy($"ORDER", desc("EXPEND"))
  .drop($"c.CUSTOMER")
  .drop($"c.ORDER")

结果:

+-------+--------+------------+
|FACTORY|CUSTOMER|EXPEND      |
+-------+--------+------------+
|ABC    |JOHN    |1.47883593E8|
|DEF    |JOHN    |8438527.0   |
|ABC    |DAVE    |9.16792E7   |
|DEF    |DAVE    |6804375.5   |
|ABC    |PET     |6.1424237E7 |
|DEF    |PET     |2569754.16  |
|DEF    |PET     |173020.0    |
|ABC    |DIN     |1.8613473E7 |
|DEF    |DIN     |2540791.0   |
|ABC    |INU     |1.35932585E7|
|DEF    |INU     |995163.0    |
+-------+--------+------------+