读取多行文本文件
Reading in multiline text files
给定以下数据集:
电影ID:abgh
电影名称:泰坦尼克号
审稿人:John Smith
得分:3.5
电影ID:adsa
电影名称:勇敢者游戏
reviewer:Mary乔
得分:4.5
...(假设数据位于单个文本文件中,其中始终有 4 行代表一个条目)
给定一个小文本文件,我们在其中尝试使用 Spark 对数据集进行一些分析以获得每个电影 ID 的平均分数,我的讲师建议如下:
将文本文件读取为 RDD
使用过滤器创建 2 个分数和电影 ID 的 RDD,即
val movieID = RDD1.filter(z=>z.contains("movieID")).map(_.split(":")).map(z=>z(1))
val score = RDD1.filter(z=>z.contains("score")).map(_.split(":")).map(z=>z(1).toFloat)
从 (2) 开始,将 2 个 RDD 压缩在一起,我将获得 movieId 与每行得分的状态。
val zip_rdd = movieID.zip(score)
val mean_score = zip_rdd.mapValues(value=>(value,1)).reduceByKey{case((sumL,countL),(sumR, countR))=>(sumL+sumR, countL+countR)}.mapValues{case(sum,count)=>sum/count}
我想知道Spark中数据是分区的,能保证数据是按顺序读取的吗?即电影 ID 和分数来自同一条评论?
提前感谢您的帮助!
编辑:如果不清楚,我可以确定 zip_rdd
中的 key/value 对来自同一条评论吗?我现在正在使用 psuedo 集群(Hortonworks 沙箱),但我想知道如果数据大小急剧增加是否会有任何变化,我最终将使用集群来计算它。
来自 Spark 新手。
很好,因为从磁盘读取可以保持顺序。 filter 是一个窄变换。而 zip 依赖于这个事实。 zip 之前没有广泛的转换。
或者,如果您希望以适当的方式处理压缩值,则可以 zipWithIndex 然后加入。这是一个窄转换,所以没问题。
给定以下数据集:
电影ID:abgh
电影名称:泰坦尼克号
审稿人:John Smith
得分:3.5
电影ID:adsa
电影名称:勇敢者游戏
reviewer:Mary乔
得分:4.5
...(假设数据位于单个文本文件中,其中始终有 4 行代表一个条目)
给定一个小文本文件,我们在其中尝试使用 Spark 对数据集进行一些分析以获得每个电影 ID 的平均分数,我的讲师建议如下:
将文本文件读取为 RDD
使用过滤器创建 2 个分数和电影 ID 的 RDD,即
val movieID = RDD1.filter(z=>z.contains("movieID")).map(_.split(":")).map(z=>z(1))
val score = RDD1.filter(z=>z.contains("score")).map(_.split(":")).map(z=>z(1).toFloat)
从 (2) 开始,将 2 个 RDD 压缩在一起,我将获得 movieId 与每行得分的状态。
val zip_rdd = movieID.zip(score)
val mean_score = zip_rdd.mapValues(value=>(value,1)).reduceByKey{case((sumL,countL),(sumR, countR))=>(sumL+sumR, countL+countR)}.mapValues{case(sum,count)=>sum/count}
我想知道Spark中数据是分区的,能保证数据是按顺序读取的吗?即电影 ID 和分数来自同一条评论?
提前感谢您的帮助!
编辑:如果不清楚,我可以确定 zip_rdd
中的 key/value 对来自同一条评论吗?我现在正在使用 psuedo 集群(Hortonworks 沙箱),但我想知道如果数据大小急剧增加是否会有任何变化,我最终将使用集群来计算它。
来自 Spark 新手。
很好,因为从磁盘读取可以保持顺序。 filter 是一个窄变换。而 zip 依赖于这个事实。 zip 之前没有广泛的转换。
或者,如果您希望以适当的方式处理压缩值,则可以 zipWithIndex 然后加入。这是一个窄转换,所以没问题。