2个rdds之间的Scala字转换操作

Scala word conversion operation between 2 rdds

有一种数据格式,两列,然后每一列用'\t'隔开,第一列是数字,第二列是meaning.The每一列的数据是String

111  A
112  B
113  C
114  D
115  E
116  F
117  G
118  H
...

其他数据也有两列,每一列的数据都是String。格式是这样的。

111  112:0.75,114:0.43,117:0.21
112  113:0.67,114:0.48,115:0.34,116:0.12
113  114:0.33,118:0.12
...

那我需要把第二个数据的数字翻译成它的具体含义。结果如下:

 A  B:0.75,D:0.43,G:0.21
 B  C:0.67,D:0.48,E:0.34,F:0.12
 ...

PS:这些数据格式都是String!

如何编码?那么如果第一个数据rdd1数据量小,第二个数据rdd2数据量大。 运行在Spark集群上,是否需要使用broadcast,具体方法是什么?谢谢你的回答

这是一种方法。假设您的 RDDs 都是 RDD[(String, String)] 类型,而您的第一个 RDD 较小。

//create your first RDD
val rdd1: RDD[(String, String)] = sc.parallelize(Seq(
  ("111", "A"),
  ("112", "B"),
  ("113", "C"),
  ("114", "D"),
  ("115", "E"),
  ("116", "F"),
  ("117", "G")))

//as this rdd is small so collect it and convert it to a map
val mapRdd1: Map[String, String] = rdd1.collect.toMap
//broadcast this map to all executors
val bRdd = sc.broadcast(mapRdd1)

//create your second rdd
val rdd2: RDD[(String, String)] = sc.parallelize(Seq(
  ("111", "112:0.75,114:0.43,117:0.21"),
  ("112", "113:0.67,114:0.48,115:0.34,116:0.12")))

val result: RDD[(String, String)] = rdd2.map(x => (x._1, //keep first string as it is
  x._2.split(",").map(a => a.split(":")) //split second string for the required transformations
    //fetch the value from the bradcasted map
    .map(t => (bRdd.value(t.head), t.last)).mkString(" ")))

result.foreach(println(_))

//output
//(111,(B,0.75) (D,0.43) (G,0.21))
//(112,(C,0.67) (D,0.48) (E,0.34) (F,0.12))

这假设 rdd2 的所有含义都出现在您的第一个 RDD 中。如果不是,则在从地图中获取值时使用 bRdd.value.getOrElse(t.head,"DEFAULT_VALUE").