Scala 广播加入 "one to many" 关系

Scala broadcast join with "one to many" relationship

我对 Scala 和 RDD 还很陌生。 我有一个非常简单的场景,但似乎很难用 RDD 来实现。

场景: 我有两个 table。一大一小。我广播较小的table。 然后我想加入 table 并最终将加入后的值聚合到最终总数。

代码示例如下:

val bigRDD = sc.parallelize(List(("A",1,"1Jan2000"),("B",2,"1Jan2000"),("C",3,"1Jan2000"),("D",3,"1Jan2000"),("E",3,"1Jan2000")))
val smallRDD = sc.parallelize(List(("A","Fruit","Apples"),("A","ZipCode","1234"),("B","Fruit","Apples"),("B","ZipCode","456")))

val broadcastVar = sc.broadcast(smallRDD.keyBy{ a => (a._1,a._2) }  // turn to pair RDD
  .collectAsMap()               // collect as Map
)

//first join
val joinedRDD = bigRDD.map( accs => {
  //get list of groups
  val groups = List("Fruit", "ZipCode")
  val i = "Fruit"
  //for each group
  //for(i <- groups) {
  if (broadcastVar.value.get(accs._1, i) != None) {
    ( broadcastVar.value.get(accs._1, i).get._1,
      broadcastVar.value.get(accs._1, i).get._2,
      accs._2, accs._3)
  } else {
    None
  }
  //}
}
)
//expected after this
//("A","Fruit","Apples",1, "1Jan2000"),("B","Fruit","Apples",2, "1Jan2000"),
//("A","ZipCode","1234", 1,"1Jan2000"),("B","ZipCode","456", 2,"1Jan2000")

//then group and sum
//cannot do anything with the joinedRDD!!!
//error == value copy is not a member of Product with Serializable

// Final Expected Result
//("Fruit","Apples",3, "1Jan2000"),("ZipCode","1234", 1,"1Jan2000"),("ZipCode","456", 2,"1Jan2000")

我的问题:

提前感谢您的帮助!


任何感兴趣的人的最终解决方案

case class dt (group:String, group_key:String, count:Long, date:String)

val bigRDD = sc.parallelize(List(("A",1,"1Jan2000"),("B",2,"1Jan2000"),("C",3,"1Jan2000"),("D",3,"1Jan2000"),("E",3,"1Jan2000")))
val smallRDD = sc.parallelize(List(("A","Fruit","Apples"),("A","ZipCode","1234"),("B","Fruit","Apples"),("B","ZipCode","456")))

val broadcastVar = sc.broadcast(smallRDD.keyBy{ a => (a._1) }  // turn to pair RDD
    .groupByKey()                 //to not loose any data
    .collectAsMap()               // collect as Map
)

//first join
val joinedRDD = bigRDD.flatMap( accs => {
  if (broadcastVar.value.get(accs._1) != None) {
  val bc = broadcastVar.value.get(accs._1).get
    bc.map(p => {
      dt(p._2, p._3,accs._2, accs._3)
    })
  } else {
    None
  }
}
)
//expected after this
//("Fruit","Apples",1, "1Jan2000"),("Fruit","Apples",2, "1Jan2000"),
//("ZipCode","1234", 1,"1Jan2000"),("ZipCode","456", 2,"1Jan2000")

//then group and sum
var finalRDD = joinedRDD.map(s => {
  (s.copy(count=0),s.count)  //trick to keep code to minimum (count = 0)
  })
  .reduceByKey(_ + _)
  .map(pair => {
    pair._1.copy(count=pair._2)
  })

在您的 map 语句中,您 return 一个元组或 None 基于 if 条件。这些类型不匹配,所以你退回到一个常见的超类型,所以 joinedRDD 是一个 RDD[Product with Serializable] 这根本不是你想要的(它基本上是 RDD[Any])。您需要确保所有路径 return 类型相同。在这种情况下,您可能需要 Option[(String, String, Int, String)]。您需要做的就是将元组结果包装成 Some

  if (broadcastVar.value.get(accs._1, i) != None) {
    Some(( broadcastVar.value.get(accs._1, i).get.group_key,
      broadcastVar.value.get(accs._1, i).get.group,
      accs._2, accs._3))
  } else {
    None
  }

现在您的类型将匹配。这将使 joinedRDDRDD[Option(String, String, Int, String)]。现在类型正确,数据可用,但是,这意味着您将需要映射选项以使用元组。如果您不需要最终结果中的 None 值,您可以使用 flatmap 而不是 map 来创建 joinedRDD ,这将为您打开选项,过滤掉所有 Nones.

CollectAsMap 是将 RDD 转换为 Hashmap 的正确方法,但您需要为单个键提供多个值。在使用 collectAsMap 之前,但在将 smallRDD 映射到键值对之后,使用 groupByKey 将单个键的所有值组合在一起。当您从 HashMap 中查找键时,您可以映射值,为每个值创建一条新记录。