在 spark 中使用 scala 转换数据
Transform data using scala in spark
我正在尝试将输入文本文件转换为 Key/Value RDD,但下面的代码不起作用。(文本文件是制表符分隔文件。)我对 Scala 很陌生,并且Spark 所以非常感谢你的帮助。
import org.apache.spark.{SparkConf, SparkContext}
import scala.io.Source
object shortTwitter {
def main(args: Array[String]): Unit = {
for (line <- Source.fromFile(args(1).txt).getLines()) {
val newLine = line.map(line =>
val p = line.split("\t")
(p(0).toString, p(1).toInt)
)
}
val sparkConf = new SparkConf().setAppName("ShortTwitterAnalysis").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val text = sc.textFile(args(0))
val counts = text.flatMap(line => line.split("\t"))
}
}
我假设您希望生成的 RDD 具有 RDD[(String, Int)]
类型,所以 -
- 您应该使用
map
(将每条记录转换为一条新记录)而不是 flatMap
(将每条记录转换为 多条 条记录)
- 您应该将
split
的结果映射到一个元组中
总计:
val counts = text
.map(line => line.split("\t"))
.map(arr => (arr(0), arr(1).toInt))
EDIT 根据评论中的说明:如果您也有兴趣修复非 Spark 部分(按顺序读取文件),您在 for-理解语法,这是全部内容:
def main(args: Array[String]): Unit = {
// read the file without Spark (not necessary when using Spark):
val countsWithoutSpark: Iterator[(String, Int)] = for {
line <- Source.fromFile(args(1)).getLines()
} yield {
val p = line.split("\t")
(p(0), p(1).toInt)
}
// equivalent code using Spark:
val sparkConf = new SparkConf().setAppName("ShortTwitterAnalysis").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val counts: RDD[(String, Int)] = sc.textFile(args(0))
.map(line => line.split("\t"))
.map(arr => (arr(0), arr(1).toInt))
}
我正在尝试将输入文本文件转换为 Key/Value RDD,但下面的代码不起作用。(文本文件是制表符分隔文件。)我对 Scala 很陌生,并且Spark 所以非常感谢你的帮助。
import org.apache.spark.{SparkConf, SparkContext}
import scala.io.Source
object shortTwitter {
def main(args: Array[String]): Unit = {
for (line <- Source.fromFile(args(1).txt).getLines()) {
val newLine = line.map(line =>
val p = line.split("\t")
(p(0).toString, p(1).toInt)
)
}
val sparkConf = new SparkConf().setAppName("ShortTwitterAnalysis").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val text = sc.textFile(args(0))
val counts = text.flatMap(line => line.split("\t"))
}
}
我假设您希望生成的 RDD 具有 RDD[(String, Int)]
类型,所以 -
- 您应该使用
map
(将每条记录转换为一条新记录)而不是flatMap
(将每条记录转换为 多条 条记录) - 您应该将
split
的结果映射到一个元组中
总计:
val counts = text
.map(line => line.split("\t"))
.map(arr => (arr(0), arr(1).toInt))
EDIT 根据评论中的说明:如果您也有兴趣修复非 Spark 部分(按顺序读取文件),您在 for-理解语法,这是全部内容:
def main(args: Array[String]): Unit = {
// read the file without Spark (not necessary when using Spark):
val countsWithoutSpark: Iterator[(String, Int)] = for {
line <- Source.fromFile(args(1)).getLines()
} yield {
val p = line.split("\t")
(p(0), p(1).toInt)
}
// equivalent code using Spark:
val sparkConf = new SparkConf().setAppName("ShortTwitterAnalysis").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val counts: RDD[(String, Int)] = sc.textFile(args(0))
.map(line => line.split("\t"))
.map(arr => (arr(0), arr(1).toInt))
}