spark rank - 基于 Scala 的 RDD 元组的第二和第三元素

spark rank - scala based one second and third elemnts of tuple of RDD

您好,我想根据元组的第二个元素和第三个元素为每一行分配一个排名,这里有示例数据。如果元组的第三个元素对 id 具有最大值,则想添加“1”。如果元组的第三个元素具有相同的值,则基于元组的第二个元素,即第二个元素元组的最大值应将“1”作为第四个元素。元组值的所有其他第四个元素将为零。我希望你理解这个要求:

    (ID,Second,Third)->tuple
    (32609,878,199)
    (32609,832,199)
    (45470,231,199)
    (42482,1001,299)
    (42482,16,291)

代码: *val Rank=matching.map{{case (x1,x2,x3)=> (x1,x2,x3,((x3.toInt*100000)+x2.toInt) .toInt)}.sortBy(-_.4).groupBy(._1)*

结果:rank.take(10).foreach(println)

(32609,CompactBuffer((32609,878,199,19900878), (32609,832,199,19900832)))
(45470,CompactBuffer((45470,231,199,19900231)))
(42482,CompactBuffer((42482,1001,299,29901001), (42482,16,291,29100016)))

期望的输出是:

(32609,878,199,1)
(32609,832,199,0)
(45470,231,199,1)
(42482,1001,299,1)
(42482,16,291,0)

看来您可以尝试以下操作:

 val rank = matching.flatMap { case (x: String, y: String, z: String) => 
    val yInt = Try(y.toInt)
    val zInt = Try(z.toInt)
    if (yInt.isSuccess && zInt.isSuccess) Option((x, (yInt.get, zInt.get)))
    else None
 }.groupByKey().flatMap { case (key: String, tuples: Iterable[(Int, Int)]) =>
     val sorted = tuples.toList.sortBy(x => (-x._2, -x._1))
     val topRank = (key, sorted.head._1, sorted.head._2, 1)
     val restRank = for (tup <- sorted.tail) yield (key, tup._1, tup._2, 0)
     List(topRank) ++ restRank
 }

初始的 flatMap 执行一些类型检查并将元组重新排序成对。第二个 flatMap(在 groupByKey 之后)分别对列表的第 3 个和第 2 个元素进行排序,然后重新创建具有排名的元组。另请注意,您需要导入 scala.util.Try 才能使用它。

编辑:根据下面的评论修改排名顺序。

这应该可以解决问题

object App {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)

    val testData = List((32609,878,199),
      (32609,832,199),
      (45470,231,199),
      (42482,1001,299),
      (42482,16,291))

    val input = sc.parallelize(testData)

    val rank = input.groupBy(_._1).flatMapValues{
      x =>
        val sorted = x.toList.sortWith((x, y) => x._2 > y._2 || (x._2 == y._2 && x._3 > y._3))
        val first = sorted.head
        (first._1, first._2, first._3, 1) :: sorted.tail.map(t => (t._1, t._2, t._3, 0))
    }.map(_._2)

    // assign the partition ID to each item to see that each group is sorted
    val resultWithPartitionID = rank.mapPartitionsWithIndex((id, it) => it.map(x => (id, x)))

    // print the contents of the RDD, elements of different partitions might be interleaved
    resultWithPartitionID foreach println

    val collectedResult = resultWithPartitionID.collect.sortBy(_._1).map(_._2)

    // print collected results
    println(collectedResult.mkString("\n"))
  }
}

输出

(32609,878,199,1)
(32609,832,199,0)
(45470,231,199,1)
(42482,1001,299,1)
(42482,16,291,0)