如何通过spark RDD中的键连接两个哈希图
how to join two hashmaps by a key in spark RDD
我有两个格式为
的RDD
{string, HashMap[long,object]}
我想对它们执行连接操作,以便在 Scala 中合并相同键的散列图。
RDD1-> {string1,HashMap[{long a,object},{long b,object}]
RDD2-> {string1,HashMap[{long c,object}]
加入两个RDD后,应该是这样的
RDD->{string1,HashMap[{long a,object},{long b,object},{long c,object}]
任何帮助将不胜感激,我也是 scala 和 spark 的新手。
您可以通过连接两个 RDD 并对映射的元组应用合并函数来实现:
def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))]
Return an RDD containing all pairs of elements with matching keys in
this and other. Each pair of elements will be returned as a (k, (v1,
v2)) tuple, where (k, v1) is in this and (k, v2) is in other. Performs
a hash join across the cluster.
def mapValues[U](f: (V) ⇒ U): RDD[(K, U)] Pass each value in the
key-value pair RDD through a map function without changing the keys;
this also retains the original RDD's partitioning.
假设,有一个像 Best way to merge two maps and sum the values of same key?
中讨论的函数合并
def [K] merge(a:K,b:K):K = ???
可能会像
def merge(a:Map[K,V],b:Map[K,V]) = a ++ b
鉴于此,RDD可以先加入
val joined = RDD1.join(RDD2)
然后映射
val mapped = joined.mapValues( v => merge(v._1,v._2))
结果是一个带有(Key,合并后的 Map)的 RDD。
Update: 更简单的方法就是取union然后key reduce:
(rdd1 union rdd2).reduceByKey(_++_)
旧解,仅供参考。这也可以通过 cogroup
来完成,它收集一个或两个 RDD 中键的值(而 join
将忽略只有一个原始 RDD 中的键的值)。见 ScalaDoc.
然后我们使用 ++
连接值列表以形成单个值列表,最后 reduce
将值(地图)连接到单个地图。
最后两个步骤可以合并为一个 mapValues
操作:
使用此数据...
val rdd1 = sc.parallelize(List("a"->Map(1->"one", 2->"two")))
val rdd2 = sc.parallelize(List("a"->Map(3->"three")))
...在火花中 shell:
val x = (rdd1 cogroup rdd2).mapValues{ case (a,b) => (a ++ b).reduce(_++_)}
x foreach println
> (a,Map(1 -> one, 2 -> two, 3 -> three))
我有两个格式为
的RDD{string, HashMap[long,object]}
我想对它们执行连接操作,以便在 Scala 中合并相同键的散列图。
RDD1-> {string1,HashMap[{long a,object},{long b,object}]
RDD2-> {string1,HashMap[{long c,object}]
加入两个RDD后,应该是这样的
RDD->{string1,HashMap[{long a,object},{long b,object},{long c,object}]
任何帮助将不胜感激,我也是 scala 和 spark 的新手。
您可以通过连接两个 RDD 并对映射的元组应用合并函数来实现:
def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] Return an RDD containing all pairs of elements with matching keys in this and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in this and (k, v2) is in other. Performs a hash join across the cluster.
def mapValues[U](f: (V) ⇒ U): RDD[(K, U)] Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.
假设,有一个像 Best way to merge two maps and sum the values of same key?
中讨论的函数合并def [K] merge(a:K,b:K):K = ???
可能会像
def merge(a:Map[K,V],b:Map[K,V]) = a ++ b
鉴于此,RDD可以先加入
val joined = RDD1.join(RDD2)
然后映射
val mapped = joined.mapValues( v => merge(v._1,v._2))
结果是一个带有(Key,合并后的 Map)的 RDD。
Update: 更简单的方法就是取union然后key reduce:
(rdd1 union rdd2).reduceByKey(_++_)
旧解,仅供参考。这也可以通过 cogroup
来完成,它收集一个或两个 RDD 中键的值(而 join
将忽略只有一个原始 RDD 中的键的值)。见 ScalaDoc.
然后我们使用 ++
连接值列表以形成单个值列表,最后 reduce
将值(地图)连接到单个地图。
最后两个步骤可以合并为一个 mapValues
操作:
使用此数据...
val rdd1 = sc.parallelize(List("a"->Map(1->"one", 2->"two")))
val rdd2 = sc.parallelize(List("a"->Map(3->"three")))
...在火花中 shell:
val x = (rdd1 cogroup rdd2).mapValues{ case (a,b) => (a ++ b).reduce(_++_)}
x foreach println
> (a,Map(1 -> one, 2 -> two, 3 -> three))