2个rdds之间的Scala字转换操作
Scala word conversion operation between 2 rdds
有一种数据格式,两列,然后每一列用'\t'隔开,第一列是数字,第二列是meaning.The每一列的数据是String
111 A
112 B
113 C
114 D
115 E
116 F
117 G
118 H
...
其他数据也有两列,每一列的数据都是String。格式是这样的。
111 112:0.75,114:0.43,117:0.21
112 113:0.67,114:0.48,115:0.34,116:0.12
113 114:0.33,118:0.12
...
那我需要把第二个数据的数字翻译成它的具体含义。结果如下:
A B:0.75,D:0.43,G:0.21
B C:0.67,D:0.48,E:0.34,F:0.12
...
PS:这些数据格式都是String!
如何编码?那么如果第一个数据rdd1数据量小,第二个数据rdd2数据量大。 运行在Spark集群上,是否需要使用broadcast,具体方法是什么?谢谢你的回答
这是一种方法。假设您的 RDDs
都是 RDD[(String, String)]
类型,而您的第一个 RDD
较小。
//create your first RDD
val rdd1: RDD[(String, String)] = sc.parallelize(Seq(
("111", "A"),
("112", "B"),
("113", "C"),
("114", "D"),
("115", "E"),
("116", "F"),
("117", "G")))
//as this rdd is small so collect it and convert it to a map
val mapRdd1: Map[String, String] = rdd1.collect.toMap
//broadcast this map to all executors
val bRdd = sc.broadcast(mapRdd1)
//create your second rdd
val rdd2: RDD[(String, String)] = sc.parallelize(Seq(
("111", "112:0.75,114:0.43,117:0.21"),
("112", "113:0.67,114:0.48,115:0.34,116:0.12")))
val result: RDD[(String, String)] = rdd2.map(x => (x._1, //keep first string as it is
x._2.split(",").map(a => a.split(":")) //split second string for the required transformations
//fetch the value from the bradcasted map
.map(t => (bRdd.value(t.head), t.last)).mkString(" ")))
result.foreach(println(_))
//output
//(111,(B,0.75) (D,0.43) (G,0.21))
//(112,(C,0.67) (D,0.48) (E,0.34) (F,0.12))
这假设 rdd2
的所有含义都出现在您的第一个 RDD
中。如果不是,则在从地图中获取值时使用 bRdd.value.getOrElse(t.head,"DEFAULT_VALUE")
.
有一种数据格式,两列,然后每一列用'\t'隔开,第一列是数字,第二列是meaning.The每一列的数据是String
111 A
112 B
113 C
114 D
115 E
116 F
117 G
118 H
...
其他数据也有两列,每一列的数据都是String。格式是这样的。
111 112:0.75,114:0.43,117:0.21
112 113:0.67,114:0.48,115:0.34,116:0.12
113 114:0.33,118:0.12
...
那我需要把第二个数据的数字翻译成它的具体含义。结果如下:
A B:0.75,D:0.43,G:0.21
B C:0.67,D:0.48,E:0.34,F:0.12
...
PS:这些数据格式都是String!
如何编码?那么如果第一个数据rdd1数据量小,第二个数据rdd2数据量大。 运行在Spark集群上,是否需要使用broadcast,具体方法是什么?谢谢你的回答
这是一种方法。假设您的 RDDs
都是 RDD[(String, String)]
类型,而您的第一个 RDD
较小。
//create your first RDD
val rdd1: RDD[(String, String)] = sc.parallelize(Seq(
("111", "A"),
("112", "B"),
("113", "C"),
("114", "D"),
("115", "E"),
("116", "F"),
("117", "G")))
//as this rdd is small so collect it and convert it to a map
val mapRdd1: Map[String, String] = rdd1.collect.toMap
//broadcast this map to all executors
val bRdd = sc.broadcast(mapRdd1)
//create your second rdd
val rdd2: RDD[(String, String)] = sc.parallelize(Seq(
("111", "112:0.75,114:0.43,117:0.21"),
("112", "113:0.67,114:0.48,115:0.34,116:0.12")))
val result: RDD[(String, String)] = rdd2.map(x => (x._1, //keep first string as it is
x._2.split(",").map(a => a.split(":")) //split second string for the required transformations
//fetch the value from the bradcasted map
.map(t => (bRdd.value(t.head), t.last)).mkString(" ")))
result.foreach(println(_))
//output
//(111,(B,0.75) (D,0.43) (G,0.21))
//(112,(C,0.67) (D,0.48) (E,0.34) (F,0.12))
这假设 rdd2
的所有含义都出现在您的第一个 RDD
中。如果不是,则在从地图中获取值时使用 bRdd.value.getOrElse(t.head,"DEFAULT_VALUE")
.