在 apache spark scala 中排序和排名?

Sorting and ranking in apache spark scala?

想在spark中做排名,如下:

输入:

5.6
5.6
5.6
6.2
8.1
5.5
5.5

排名:

1
1
1
2
3
0
0
0

输出:

Rank Input 
0     5.5
0     5.5
1     5.6
1     5.6
1     5.6
2     6.2
3     8.1

我想知道如何在 spark 中对这些进行排序并获得与上面列出的相同的排名。要求是:

  1. 排名从 0 开始,而不是 1
  2. 这是数百万条记录的示例案例,一个分区可能非常大 - 我很欣赏有关如何使用内部排序方法进行排名的建议

我想在 scala 中做这个。有人可以帮我写代码吗?

如果您希望只有 一些 排名,您可以首先获取所有 distinct 值,将它们收集为 List 并将它们转换为 BroadCast.下面,我展示了一个肮脏的例子,注意它不能保证输出将被排序(可能有更好的方法,但这是我想到的第一件事):

// Case 1. k is small (fits in the driver and nodes)
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.collect.sortBy(x => x)
val broadcast = sc.broadcast(distincts)

val sdd = rdd.map{
  case i: Int => (broadcast.value.asInstanceOf[Array[Int]].indexOf(i), i)
}

sdd.collect()

// Array[(Int, Int)] = Array((0,1), (0,1), (4,44), (2,4), (0,1), (3,33), (4,44), (0,1), (1,2))

在第二种方法中,我使用 Spark 的功能进行排序,在 RDD's documentation 中,您可以找到 zipWithIndexkeyBy 的工作原理。

//case 2. k is big, distinct values don't fit in the Driver.
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.sortBy(x => x).zipWithIndex
rdd.keyBy(x => x)
  .join(distincts.keyBy(_._1))
  .map{
    case (value: Int, (v1: Int, (v2: Int, index: Long))) => (index, value)
  }.collect()

//res15: Array[(Long, Int)] = Array((3,33), (2,4), (0,1), (0,1), (0,1), (0,1), (4,44), (4,44), (1,2))

顺便说一下,我使用 collect 只是为了可视化目的,在真实的应用程序中你不应该使用它,除非你确定它适合驱动程序的内存。