在 apache spark scala 中排序和排名?
Sorting and ranking in apache spark scala?
想在spark中做排名,如下:
输入:
5.6
5.6
5.6
6.2
8.1
5.5
5.5
排名:
1
1
1
2
3
0
0
0
输出:
Rank Input
0 5.5
0 5.5
1 5.6
1 5.6
1 5.6
2 6.2
3 8.1
我想知道如何在 spark 中对这些进行排序并获得与上面列出的相同的排名。要求是:
- 排名从 0 开始,而不是 1
- 这是数百万条记录的示例案例,一个分区可能非常大 - 我很欣赏有关如何使用内部排序方法进行排名的建议
我想在 scala 中做这个。有人可以帮我写代码吗?
如果您希望只有 一些 排名,您可以首先获取所有 distinct
值,将它们收集为 List
并将它们转换为 BroadCast
.下面,我展示了一个肮脏的例子,注意它不能保证输出将被排序(可能有更好的方法,但这是我想到的第一件事):
// Case 1. k is small (fits in the driver and nodes)
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.collect.sortBy(x => x)
val broadcast = sc.broadcast(distincts)
val sdd = rdd.map{
case i: Int => (broadcast.value.asInstanceOf[Array[Int]].indexOf(i), i)
}
sdd.collect()
// Array[(Int, Int)] = Array((0,1), (0,1), (4,44), (2,4), (0,1), (3,33), (4,44), (0,1), (1,2))
在第二种方法中,我使用 Spark 的功能进行排序,在 RDD's documentation 中,您可以找到 zipWithIndex
和 keyBy
的工作原理。
//case 2. k is big, distinct values don't fit in the Driver.
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.sortBy(x => x).zipWithIndex
rdd.keyBy(x => x)
.join(distincts.keyBy(_._1))
.map{
case (value: Int, (v1: Int, (v2: Int, index: Long))) => (index, value)
}.collect()
//res15: Array[(Long, Int)] = Array((3,33), (2,4), (0,1), (0,1), (0,1), (0,1), (4,44), (4,44), (1,2))
顺便说一下,我使用 collect
只是为了可视化目的,在真实的应用程序中你不应该使用它,除非你确定它适合驱动程序的内存。
想在spark中做排名,如下:
输入:
5.6
5.6
5.6
6.2
8.1
5.5
5.5
排名:
1
1
1
2
3
0
0
0
输出:
Rank Input
0 5.5
0 5.5
1 5.6
1 5.6
1 5.6
2 6.2
3 8.1
我想知道如何在 spark 中对这些进行排序并获得与上面列出的相同的排名。要求是:
- 排名从 0 开始,而不是 1
- 这是数百万条记录的示例案例,一个分区可能非常大 - 我很欣赏有关如何使用内部排序方法进行排名的建议
我想在 scala 中做这个。有人可以帮我写代码吗?
如果您希望只有 一些 排名,您可以首先获取所有 distinct
值,将它们收集为 List
并将它们转换为 BroadCast
.下面,我展示了一个肮脏的例子,注意它不能保证输出将被排序(可能有更好的方法,但这是我想到的第一件事):
// Case 1. k is small (fits in the driver and nodes)
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.collect.sortBy(x => x)
val broadcast = sc.broadcast(distincts)
val sdd = rdd.map{
case i: Int => (broadcast.value.asInstanceOf[Array[Int]].indexOf(i), i)
}
sdd.collect()
// Array[(Int, Int)] = Array((0,1), (0,1), (4,44), (2,4), (0,1), (3,33), (4,44), (0,1), (1,2))
在第二种方法中,我使用 Spark 的功能进行排序,在 RDD's documentation 中,您可以找到 zipWithIndex
和 keyBy
的工作原理。
//case 2. k is big, distinct values don't fit in the Driver.
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.sortBy(x => x).zipWithIndex
rdd.keyBy(x => x)
.join(distincts.keyBy(_._1))
.map{
case (value: Int, (v1: Int, (v2: Int, index: Long))) => (index, value)
}.collect()
//res15: Array[(Long, Int)] = Array((3,33), (2,4), (0,1), (0,1), (0,1), (0,1), (4,44), (4,44), (1,2))
顺便说一下,我使用 collect
只是为了可视化目的,在真实的应用程序中你不应该使用它,除非你确定它适合驱动程序的内存。