如何在spark scala中加入2个rdd
how to join 2 rdd's in spark scala
我有 2 个 RDD,如下所示
val rdd1 = spark.sparkContext.parallelize(Seq((123, List(("000000011119",20),("000000011120",30),("000000011121",50))),(234, List(("000000011119",20),("000000011120",30),("000000011121",50)))))
val rdd2 = spark.sparkContext.parallelize(Seq((123, List("000000011119","000000011120")),(234, List("000000011121","000000011120"))))
我想在rdd2中的密钥对的基础上对rdd1中的值进行加法。
需要输出:
RDD[(123,50),(234,80)]
任何帮助将不胜感激。
实际上这是对行的第一个元素以及每个内容的第一个元素的连接。
所以我将其分解成多行并以这种方式加入
val flat1 = rdd1.flatMap(r => r._2.map(e => ((r._1, e._1), e._2))) // looks like ((234,000000011119),20)
val flat2 = rdd2.flatMap(r => r._2.map(e => ((r._1, e), true))) // looks like ((234,000000011121),true)
val res = flat1.join(flat2)
.map(r => (r._1._1, r._2._1)) // looks like (123, 30)
.reduceByKey(_ + _) // total each key group
结果 .foreach(println)
scala> :pas
// Entering paste mode (ctrl-D to finish)
flat1.join(flat2)
.map(r => (r._1._1, r._2._1)) // looks like (123, 30)
.reduceByKey(_ + _) // total each key group
.foreach(println)
// Exiting paste mode, now interpreting.
(123,50)
(234,80)
像往常一样,使用数据集这些东西要简单得多,所以这将是我对未来的建议。
我有 2 个 RDD,如下所示
val rdd1 = spark.sparkContext.parallelize(Seq((123, List(("000000011119",20),("000000011120",30),("000000011121",50))),(234, List(("000000011119",20),("000000011120",30),("000000011121",50)))))
val rdd2 = spark.sparkContext.parallelize(Seq((123, List("000000011119","000000011120")),(234, List("000000011121","000000011120"))))
我想在rdd2中的密钥对的基础上对rdd1中的值进行加法。
需要输出:
RDD[(123,50),(234,80)]
任何帮助将不胜感激。
实际上这是对行的第一个元素以及每个内容的第一个元素的连接。
所以我将其分解成多行并以这种方式加入
val flat1 = rdd1.flatMap(r => r._2.map(e => ((r._1, e._1), e._2))) // looks like ((234,000000011119),20)
val flat2 = rdd2.flatMap(r => r._2.map(e => ((r._1, e), true))) // looks like ((234,000000011121),true)
val res = flat1.join(flat2)
.map(r => (r._1._1, r._2._1)) // looks like (123, 30)
.reduceByKey(_ + _) // total each key group
结果 .foreach(println)
scala> :pas
// Entering paste mode (ctrl-D to finish)
flat1.join(flat2)
.map(r => (r._1._1, r._2._1)) // looks like (123, 30)
.reduceByKey(_ + _) // total each key group
.foreach(println)
// Exiting paste mode, now interpreting.
(123,50)
(234,80)
像往常一样,使用数据集这些东西要简单得多,所以这将是我对未来的建议。