想要通过 Scala 解析文件并重新格式化以在 Spark 中创建一个 pairRDD
Want to parse a file and reformat it to create a pairRDD in Spark through Scala
我在文件中有以下形式的数据集:
1: 1664968
2: 3 747213 1664968 1691047 4095634 5535664
3: 9 77935 79583 84707 564578 594898 681805 681886 835470 880698
4: 145
5: 8 57544 58089 60048 65880 284186 313376
6: 8
我需要使用 Spark 和 Scala 作为数据预处理的一部分将其转换为如下内容:
1 1664968
2 3
2 747213
2 1664968
2 4095634
2 5535664
3 9
3 77935
3 79583
3 84707
等等....
任何人都可以提供有关如何完成此操作的意见。
文件中原始行的长度变化如上面的数据集示例所示。
我不确定如何进行此转换。
我试过类似下面的东西,它给了我一对键和分号后的第一个元素。
但我不确定如何遍历整个数据并根据需要生成对。
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("Graphx").setMaster("local"))
val rawLinks = sc.textFile("src/main/resources/links-simple-sorted-top100.txt")
rawLinks.take(5).foreach(println)
val formattedLinks = rawLinks.map{ rows =>
val fields = rows.split(":")
val fromVertex = fields(0)
val toVerticesArray = fields(1).split(" ")
(fromVertex, toVerticesArray(1))
}
val topFive = formattedLinks.take(5)
topFive.foreach(println)
}
将行拆分为 2 部分并映射到可变列数。
def transform(s: String): Array[String] = {
val Array(head, tail) = s.split(":", 2)
tail.trim.split("""\s+""").map(x => s"$head $x")
}
> transform("2: 3 747213 1664968 1691047 4095634 5535664")
// Array(2 3, 2 747213, 2 1664968, 2 1691047, 2 4095634, 2 5535664)
val rdd = sc.parallelize(List("1: 1664968","2: 3 747213 1664968 1691047 4095634 5535664"))
val keyValues = rdd.flatMap(line => {
val Array(key, values) = line.split(":",2)
for(value <- values.trim.split("""\s+"""))
yield (key, value.trim)
})
keyValues.collect
我在文件中有以下形式的数据集:
1: 1664968
2: 3 747213 1664968 1691047 4095634 5535664
3: 9 77935 79583 84707 564578 594898 681805 681886 835470 880698
4: 145
5: 8 57544 58089 60048 65880 284186 313376
6: 8
我需要使用 Spark 和 Scala 作为数据预处理的一部分将其转换为如下内容:
1 1664968
2 3
2 747213
2 1664968
2 4095634
2 5535664
3 9
3 77935
3 79583
3 84707
等等....
任何人都可以提供有关如何完成此操作的意见。 文件中原始行的长度变化如上面的数据集示例所示。
我不确定如何进行此转换。
我试过类似下面的东西,它给了我一对键和分号后的第一个元素。
但我不确定如何遍历整个数据并根据需要生成对。
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("Graphx").setMaster("local"))
val rawLinks = sc.textFile("src/main/resources/links-simple-sorted-top100.txt")
rawLinks.take(5).foreach(println)
val formattedLinks = rawLinks.map{ rows =>
val fields = rows.split(":")
val fromVertex = fields(0)
val toVerticesArray = fields(1).split(" ")
(fromVertex, toVerticesArray(1))
}
val topFive = formattedLinks.take(5)
topFive.foreach(println)
}
将行拆分为 2 部分并映射到可变列数。
def transform(s: String): Array[String] = {
val Array(head, tail) = s.split(":", 2)
tail.trim.split("""\s+""").map(x => s"$head $x")
}
> transform("2: 3 747213 1664968 1691047 4095634 5535664")
// Array(2 3, 2 747213, 2 1664968, 2 1691047, 2 4095634, 2 5535664)
val rdd = sc.parallelize(List("1: 1664968","2: 3 747213 1664968 1691047 4095634 5535664"))
val keyValues = rdd.flatMap(line => {
val Array(key, values) = line.split(":",2)
for(value <- values.trim.split("""\s+"""))
yield (key, value.trim)
})
keyValues.collect