想要通过 Scala 解析文件并重新格式化以在 Spark 中创建一个 pairRDD

Want to parse a file and reformat it to create a pairRDD in Spark through Scala

我在文件中有以下形式的数据集:

1: 1664968

2: 3 747213 1664968 1691047 4095634 5535664

3: 9 77935 79583 84707 564578 594898 681805 681886 835470 880698 

4: 145

5: 8 57544 58089 60048 65880 284186 313376 

6: 8

我需要使用 Spark 和 Scala 作为数据预处理的一部分将其转换为如下内容:

1 1664968

2 3

2 747213

2 1664968

2 4095634 

2 5535664

3 9

3 77935

3 79583

3 84707

等等....

任何人都可以提供有关如何完成此操作的意见。 文件中原始行的长度变化如上面的数据集示例所示。

我不确定如何进行此转换。

我试过类似下面的东西,它给了我一对键和分号后的第一个元素。

但我不确定如何遍历整个数据并根据需要生成对。

def main(args: Array[String]): Unit = {
  val sc = new SparkContext(new SparkConf().setAppName("Graphx").setMaster("local"))
  val rawLinks = sc.textFile("src/main/resources/links-simple-sorted-top100.txt")

  rawLinks.take(5).foreach(println)

  val formattedLinks = rawLinks.map{ rows =>
    val fields = rows.split(":")
    val fromVertex = fields(0)
    val toVerticesArray = fields(1).split(" ")
    (fromVertex, toVerticesArray(1))
  }

  val topFive = formattedLinks.take(5)
  topFive.foreach(println)
}

将行拆分为 2 部分并映射到可变列数。

def transform(s: String): Array[String] = { 
  val Array(head, tail) = s.split(":", 2)
  tail.trim.split("""\s+""").map(x => s"$head $x")
}

> transform("2: 3 747213 1664968 1691047 4095634 5535664")
// Array(2 3, 2 747213, 2 1664968, 2 1691047, 2 4095634, 2 5535664)
val rdd = sc.parallelize(List("1: 1664968","2: 3 747213 1664968 1691047 4095634 5535664"))
val keyValues = rdd.flatMap(line => {
  val Array(key, values) = line.split(":",2)
  for(value <- values.trim.split("""\s+""")) 
    yield (key, value.trim)
})
keyValues.collect