地图内的迭代查找
Iterative lookup within map
def description(list:Array[String]): Array[String] = {
for (y <- list) yield modulelookup.lookup(take(4)) + " " + brandlookup.lookup(y.drop(4)).toString()
}
val printRDD = outputRDD.collect().map(x=> (description(x._1),x._2))
是我当前的代码。我想在没有收集的情况下这样做。 modulelookup 和 brandlookup 是 RDD。这该怎么做?
如果 modulelookup
和 brandlookup
相对较小,您可以将它们转换为广播变量并用于映射,如下所示:
val modulelookupBD = sc.broadcast(modulelookup.collectAsMap)
val brandlookupBD = sc.broadcast(brandlookup.collectAsMap)
def description(list:Array[String]): Array[String] = list.map(x => {
val module = modulelookupBD.value.getOrElse(x.take(4), "")
val brand = brandlookupBD.value.getOrElse(x.drop(4), "")
s"$module $brand"
})
val printRDD = outputRDD.map{case (xs, y) => (description(xs), y)}
否则没有有效的方法来处理这个问题。您可以尝试 flatMap
、join
和 groupByKey
,但对于任何大型数据集,这种组合都可能非常昂贵。
val indexed = outputRDD.zipWithUniqueId
val flattened = indexed.flatMap{case ((xs, _), id) => xs.map(x => (x, id))}
val withModuleAndBrand = flattened
.map(xid => (xid._1.take(4), xid))
.join(modulelookup)
.values
.map{case ((x, id), module) => (x.drop(4), (id, module))}
.join(brandlookup)
.values
.map{case ((id, module), brand) => (id, s"$module $brand")}
.groupByKey
val final = withModuleAndBrand.join(
indexed.map{case ((_, y), id) => (id, y)}
).values
用 DataFrame 替换 RDD 可以减少样板代码,但性能仍然是一个问题。
def description(list:Array[String]): Array[String] = {
for (y <- list) yield modulelookup.lookup(take(4)) + " " + brandlookup.lookup(y.drop(4)).toString()
}
val printRDD = outputRDD.collect().map(x=> (description(x._1),x._2))
是我当前的代码。我想在没有收集的情况下这样做。 modulelookup 和 brandlookup 是 RDD。这该怎么做?
如果 modulelookup
和 brandlookup
相对较小,您可以将它们转换为广播变量并用于映射,如下所示:
val modulelookupBD = sc.broadcast(modulelookup.collectAsMap)
val brandlookupBD = sc.broadcast(brandlookup.collectAsMap)
def description(list:Array[String]): Array[String] = list.map(x => {
val module = modulelookupBD.value.getOrElse(x.take(4), "")
val brand = brandlookupBD.value.getOrElse(x.drop(4), "")
s"$module $brand"
})
val printRDD = outputRDD.map{case (xs, y) => (description(xs), y)}
否则没有有效的方法来处理这个问题。您可以尝试 flatMap
、join
和 groupByKey
,但对于任何大型数据集,这种组合都可能非常昂贵。
val indexed = outputRDD.zipWithUniqueId
val flattened = indexed.flatMap{case ((xs, _), id) => xs.map(x => (x, id))}
val withModuleAndBrand = flattened
.map(xid => (xid._1.take(4), xid))
.join(modulelookup)
.values
.map{case ((x, id), module) => (x.drop(4), (id, module))}
.join(brandlookup)
.values
.map{case ((id, module), brand) => (id, s"$module $brand")}
.groupByKey
val final = withModuleAndBrand.join(
indexed.map{case ((_, y), id) => (id, y)}
).values
用 DataFrame 替换 RDD 可以减少样板代码,但性能仍然是一个问题。