如何通过spark RDD中的键连接两个哈希图

how to join two hashmaps by a key in spark RDD

我有两个格式为

的RDD

{string, HashMap[long,object]}

我想对它们执行连接操作,以便在 Scala 中合并相同键的散列图。

RDD1-> {string1,HashMap[{long a,object},{long b,object}]
RDD2-> {string1,HashMap[{long c,object}]

加入两个RDD后,应该是这样的

RDD->{string1,HashMap[{long a,object},{long b,object},{long c,object}]

任何帮助将不胜感激,我也是 scala 和 spark 的新手。

您可以通过连接两个 RDD 并对映射的元组应用合并函数来实现:

def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] Return an RDD containing all pairs of elements with matching keys in this and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in this and (k, v2) is in other. Performs a hash join across the cluster.

def mapValues[U](f: (V) ⇒ U): RDD[(K, U)] Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.

假设,有一个像 Best way to merge two maps and sum the values of same key?

中讨论的函数合并
def [K] merge(a:K,b:K):K = ???

可能会像

def merge(a:Map[K,V],b:Map[K,V]) = a ++ b

鉴于此,RDD可以先加入

val joined = RDD1.join(RDD2) 

然后映射

val mapped = joined.mapValues( v => merge(v._1,v._2))

结果是一个带有(Key,合并后的 Map)的 RDD。

Update: 更简单的方法就是取union然后key reduce:

(rdd1 union rdd2).reduceByKey(_++_)

旧解,仅供参考。这也可以通过 cogroup 来完成,它收集一个或两个 RDD 中键的值(而 join 将忽略只有一个原始 RDD 中的键的值)。见 ScalaDoc.

然后我们使用 ++ 连接值列表以形成单个值列表,最后 reduce 将值(地图)连接到单个地图。

最后两个步骤可以合并为一个 mapValues 操作:

使用此数据...

val rdd1 = sc.parallelize(List("a"->Map(1->"one", 2->"two")))
val rdd2 = sc.parallelize(List("a"->Map(3->"three")))

...在火花中 shell:

val x = (rdd1 cogroup rdd2).mapValues{ case (a,b) => (a ++ b).reduce(_++_)}

x foreach println

> (a,Map(1 -> one, 2 -> two, 3 -> three))