Spark/Scala:展开一个包含(List[String], String)元组的列表

Spark/Scala: Expand a list of (List[String], String) tuples

基本上this question 仅适用于 Scala。

给定具有以下形式元素的 RDD,我如何进行以下转换

(List[String], String) => (String, String)

例如

([A,B,C], X)
([C,D,E], Y)

to

(A, X)
(B, X)
(C, X)
(C, Y)
(D, Y)
(E, Y)

所以

scala> val l = List((List('a, 'b, 'c) -> 'x), List('c, 'd, 'e) -> 'y)
l: List[(List[Symbol], Symbol)] = List((List('a, 'b, 'c),'x),
                                       (List('c, 'd, 'e),'y))

scala> l.flatMap { case (innerList, c) => innerList.map(_ -> c) }
res0: List[(Symbol, Symbol)] = List(('a,'x), ('b,'x), ('c,'x), ('c,'y),
                                    ('d,'y), ('e,'y))

我认为 RDD flatMapValues 最适合这种情况。

val A = List((List(A,B,C),X),(List(A,B,C),Y))
val rdd = sc.parallelize(A)
val output = rdd.map(x=>(x._2,x._1)).flatMapValues(x=>x)

这会将 X 映射到 List(A,B,C) 中的每个值,从而产生成对的 RDD[(X,A),(X,B),(X,C)... (Y,A),(Y,B),(Y,C)]

  val l = (List(1, 2, 3), "A")
  val result = l._1.map((_, l._2))
  println(result)

会给你:

List((1,A), (2,A), (3,A))

使用 Spark,您可以通过以下方式解决问题:

object App {
  def main(args: Array[String]) {
    val input = Seq((List("A", "B", "C"), "X"), (List("C", "D", "E"), "Y"))

    val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(input)

    val result = rdd.flatMap {
      case (list, label) => {
        list.map( (_, label))
      }
    }

    result.foreach(println)
  }
}

这将输出:

(C,Y)
(D,Y)
(A,X)
(B,X)
(E,Y)
(C,X)

使用漂亮的理解并使参数通用

    def convert[F, S](input: (List[F], S)): List[(F, S)] = {
    for {
      x <- input._1
    } yield {
      (x, input._2)
    }
  }

一个示例调用

convert(List(1, 2, 3), "A")

会给你

List((1,A), (2,A), (3,A))