Scala 广播加入 "one to many" 关系
Scala broadcast join with "one to many" relationship
我对 Scala 和 RDD 还很陌生。
我有一个非常简单的场景,但似乎很难用 RDD 来实现。
场景:
我有两个 table。一大一小。我广播较小的table。
然后我想加入 table 并最终将加入后的值聚合到最终总数。
代码示例如下:
val bigRDD = sc.parallelize(List(("A",1,"1Jan2000"),("B",2,"1Jan2000"),("C",3,"1Jan2000"),("D",3,"1Jan2000"),("E",3,"1Jan2000")))
val smallRDD = sc.parallelize(List(("A","Fruit","Apples"),("A","ZipCode","1234"),("B","Fruit","Apples"),("B","ZipCode","456")))
val broadcastVar = sc.broadcast(smallRDD.keyBy{ a => (a._1,a._2) } // turn to pair RDD
.collectAsMap() // collect as Map
)
//first join
val joinedRDD = bigRDD.map( accs => {
//get list of groups
val groups = List("Fruit", "ZipCode")
val i = "Fruit"
//for each group
//for(i <- groups) {
if (broadcastVar.value.get(accs._1, i) != None) {
( broadcastVar.value.get(accs._1, i).get._1,
broadcastVar.value.get(accs._1, i).get._2,
accs._2, accs._3)
} else {
None
}
//}
}
)
//expected after this
//("A","Fruit","Apples",1, "1Jan2000"),("B","Fruit","Apples",2, "1Jan2000"),
//("A","ZipCode","1234", 1,"1Jan2000"),("B","ZipCode","456", 2,"1Jan2000")
//then group and sum
//cannot do anything with the joinedRDD!!!
//error == value copy is not a member of Product with Serializable
// Final Expected Result
//("Fruit","Apples",3, "1Jan2000"),("ZipCode","1234", 1,"1Jan2000"),("ZipCode","456", 2,"1Jan2000")
我的问题:
- 这首先是 RDD 的最佳方法吗?
免责声明 - 我已经成功地使用数据帧完成了整个任务。这个想法是创建另一个仅使用 RDD 的版本来比较性能。
- 为什么我的joinedRDD创建后无法识别类型,可以继续在上面使用copy等功能?
- 广播变量时不执行 .collectAsMap() 怎么办?我目前必须将第一个包含到项目中以强制执行唯一性并且不删除任何值。
提前感谢您的帮助!
任何感兴趣的人的最终解决方案
case class dt (group:String, group_key:String, count:Long, date:String)
val bigRDD = sc.parallelize(List(("A",1,"1Jan2000"),("B",2,"1Jan2000"),("C",3,"1Jan2000"),("D",3,"1Jan2000"),("E",3,"1Jan2000")))
val smallRDD = sc.parallelize(List(("A","Fruit","Apples"),("A","ZipCode","1234"),("B","Fruit","Apples"),("B","ZipCode","456")))
val broadcastVar = sc.broadcast(smallRDD.keyBy{ a => (a._1) } // turn to pair RDD
.groupByKey() //to not loose any data
.collectAsMap() // collect as Map
)
//first join
val joinedRDD = bigRDD.flatMap( accs => {
if (broadcastVar.value.get(accs._1) != None) {
val bc = broadcastVar.value.get(accs._1).get
bc.map(p => {
dt(p._2, p._3,accs._2, accs._3)
})
} else {
None
}
}
)
//expected after this
//("Fruit","Apples",1, "1Jan2000"),("Fruit","Apples",2, "1Jan2000"),
//("ZipCode","1234", 1,"1Jan2000"),("ZipCode","456", 2,"1Jan2000")
//then group and sum
var finalRDD = joinedRDD.map(s => {
(s.copy(count=0),s.count) //trick to keep code to minimum (count = 0)
})
.reduceByKey(_ + _)
.map(pair => {
pair._1.copy(count=pair._2)
})
在您的 map 语句中,您 return 一个元组或 None 基于 if 条件。这些类型不匹配,所以你退回到一个常见的超类型,所以 joinedRDD
是一个 RDD[Product with Serializable]
这根本不是你想要的(它基本上是 RDD[Any]
)。您需要确保所有路径 return 类型相同。在这种情况下,您可能需要 Option[(String, String, Int, String)]
。您需要做的就是将元组结果包装成 Some
if (broadcastVar.value.get(accs._1, i) != None) {
Some(( broadcastVar.value.get(accs._1, i).get.group_key,
broadcastVar.value.get(accs._1, i).get.group,
accs._2, accs._3))
} else {
None
}
现在您的类型将匹配。这将使 joinedRDD
和 RDD[Option(String, String, Int, String)]
。现在类型正确,数据可用,但是,这意味着您将需要映射选项以使用元组。如果您不需要最终结果中的 None
值,您可以使用 flatmap
而不是 map
来创建 joinedRDD
,这将为您打开选项,过滤掉所有 None
s.
CollectAsMap
是将 RDD 转换为 Hashmap 的正确方法,但您需要为单个键提供多个值。在使用 collectAsMap
之前,但在将 smallRDD 映射到键值对之后,使用 groupByKey
将单个键的所有值组合在一起。当您从 HashMap 中查找键时,您可以映射值,为每个值创建一条新记录。
我对 Scala 和 RDD 还很陌生。 我有一个非常简单的场景,但似乎很难用 RDD 来实现。
场景: 我有两个 table。一大一小。我广播较小的table。 然后我想加入 table 并最终将加入后的值聚合到最终总数。
代码示例如下:
val bigRDD = sc.parallelize(List(("A",1,"1Jan2000"),("B",2,"1Jan2000"),("C",3,"1Jan2000"),("D",3,"1Jan2000"),("E",3,"1Jan2000")))
val smallRDD = sc.parallelize(List(("A","Fruit","Apples"),("A","ZipCode","1234"),("B","Fruit","Apples"),("B","ZipCode","456")))
val broadcastVar = sc.broadcast(smallRDD.keyBy{ a => (a._1,a._2) } // turn to pair RDD
.collectAsMap() // collect as Map
)
//first join
val joinedRDD = bigRDD.map( accs => {
//get list of groups
val groups = List("Fruit", "ZipCode")
val i = "Fruit"
//for each group
//for(i <- groups) {
if (broadcastVar.value.get(accs._1, i) != None) {
( broadcastVar.value.get(accs._1, i).get._1,
broadcastVar.value.get(accs._1, i).get._2,
accs._2, accs._3)
} else {
None
}
//}
}
)
//expected after this
//("A","Fruit","Apples",1, "1Jan2000"),("B","Fruit","Apples",2, "1Jan2000"),
//("A","ZipCode","1234", 1,"1Jan2000"),("B","ZipCode","456", 2,"1Jan2000")
//then group and sum
//cannot do anything with the joinedRDD!!!
//error == value copy is not a member of Product with Serializable
// Final Expected Result
//("Fruit","Apples",3, "1Jan2000"),("ZipCode","1234", 1,"1Jan2000"),("ZipCode","456", 2,"1Jan2000")
我的问题:
- 这首先是 RDD 的最佳方法吗? 免责声明 - 我已经成功地使用数据帧完成了整个任务。这个想法是创建另一个仅使用 RDD 的版本来比较性能。
- 为什么我的joinedRDD创建后无法识别类型,可以继续在上面使用copy等功能?
- 广播变量时不执行 .collectAsMap() 怎么办?我目前必须将第一个包含到项目中以强制执行唯一性并且不删除任何值。
提前感谢您的帮助!
任何感兴趣的人的最终解决方案
case class dt (group:String, group_key:String, count:Long, date:String)
val bigRDD = sc.parallelize(List(("A",1,"1Jan2000"),("B",2,"1Jan2000"),("C",3,"1Jan2000"),("D",3,"1Jan2000"),("E",3,"1Jan2000")))
val smallRDD = sc.parallelize(List(("A","Fruit","Apples"),("A","ZipCode","1234"),("B","Fruit","Apples"),("B","ZipCode","456")))
val broadcastVar = sc.broadcast(smallRDD.keyBy{ a => (a._1) } // turn to pair RDD
.groupByKey() //to not loose any data
.collectAsMap() // collect as Map
)
//first join
val joinedRDD = bigRDD.flatMap( accs => {
if (broadcastVar.value.get(accs._1) != None) {
val bc = broadcastVar.value.get(accs._1).get
bc.map(p => {
dt(p._2, p._3,accs._2, accs._3)
})
} else {
None
}
}
)
//expected after this
//("Fruit","Apples",1, "1Jan2000"),("Fruit","Apples",2, "1Jan2000"),
//("ZipCode","1234", 1,"1Jan2000"),("ZipCode","456", 2,"1Jan2000")
//then group and sum
var finalRDD = joinedRDD.map(s => {
(s.copy(count=0),s.count) //trick to keep code to minimum (count = 0)
})
.reduceByKey(_ + _)
.map(pair => {
pair._1.copy(count=pair._2)
})
在您的 map 语句中,您 return 一个元组或 None 基于 if 条件。这些类型不匹配,所以你退回到一个常见的超类型,所以 joinedRDD
是一个 RDD[Product with Serializable]
这根本不是你想要的(它基本上是 RDD[Any]
)。您需要确保所有路径 return 类型相同。在这种情况下,您可能需要 Option[(String, String, Int, String)]
。您需要做的就是将元组结果包装成 Some
if (broadcastVar.value.get(accs._1, i) != None) {
Some(( broadcastVar.value.get(accs._1, i).get.group_key,
broadcastVar.value.get(accs._1, i).get.group,
accs._2, accs._3))
} else {
None
}
现在您的类型将匹配。这将使 joinedRDD
和 RDD[Option(String, String, Int, String)]
。现在类型正确,数据可用,但是,这意味着您将需要映射选项以使用元组。如果您不需要最终结果中的 None
值,您可以使用 flatmap
而不是 map
来创建 joinedRDD
,这将为您打开选项,过滤掉所有 None
s.
CollectAsMap
是将 RDD 转换为 Hashmap 的正确方法,但您需要为单个键提供多个值。在使用 collectAsMap
之前,但在将 smallRDD 映射到键值对之后,使用 groupByKey
将单个键的所有值组合在一起。当您从 HashMap 中查找键时,您可以映射值,为每个值创建一条新记录。