将输入文件中每个记录的特定字段提取到 apache spark 中的 key/value 对中
extracting particular fields of each record from input file into key/value pairs in apache spark
以下是Apache spark 处理的输入文件。每个记录包含四个字段,如图所示。
User ID Movie ID Rating Date
6 9217 2 2005-04-28 00:00:00.000
6 9254 4 2005-04-20 00:00:00.000
6 9330 4 2004-11-17 00:00:00.000
6 9330 5 2004-09-27 00:00:00.000
6 1615 4 2004-09-15 00:00:00.000
6 1659 3 2005-02-18 00:00:00.000
6 9254 4 2005-10-26 00:00:00.000
6 9217 3 2005-11-25 00:00:00.000
6 9217 3 2004-09-15 00:00:00.000
从每个记录中,我需要将 Movie ID
和 Rating
提取到 key/value 对中作为 ex: (Movie ID,Rating )
。我浏览了 Apache spark 的文档并进行了一些冲浪。但是找不到相关的东西。如有任何建议,我们将不胜感激。
我可以使用模式匹配在样本行上完成。但不确定如何应用于输入文件的所有行以生成所需的 key/value 对 (movie Id, Rating)
.
scala> val LogEntry = """^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)""".r
LogEntry: scala.util.matching.Regex = ^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)
scala> val LogEntry(userid,movieid,rating,date) = "6 9217 2 2005-04-28 00:00:00.000"
userid: String = 6
movieid: String = 9217
rating: String = 2
date: String = 2005-04-28 00:00:00.000
你可以这样做:
val text= sc.textFile("movies.txt")
val LogEntry = """^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)""".r
val logEntries = text.flatMap(line => line match {
case LogEntry(userid,movieid,rating,date) => Some((userid.toInt,movieid.toInt,rating.toInt,date))
case _ => None
}).cache()
val movieTotalRating = logEntries.map(line => line match { case (userid,movieid,rating,date) => (movieid,rating)}).reduceByKey((rating1, rating2) => rating1 + rating2)
注意,我在 Optional
return 类型上使用 .flatMap
来删除与 RegEx 不匹配的行(如 header 行)
通过缓存 logEntries
,您可以从同一个解析数据集计算多个统计信息(如此处的 movieTotalRating)。
以下是Apache spark 处理的输入文件。每个记录包含四个字段,如图所示。
User ID Movie ID Rating Date
6 9217 2 2005-04-28 00:00:00.000
6 9254 4 2005-04-20 00:00:00.000
6 9330 4 2004-11-17 00:00:00.000
6 9330 5 2004-09-27 00:00:00.000
6 1615 4 2004-09-15 00:00:00.000
6 1659 3 2005-02-18 00:00:00.000
6 9254 4 2005-10-26 00:00:00.000
6 9217 3 2005-11-25 00:00:00.000
6 9217 3 2004-09-15 00:00:00.000
从每个记录中,我需要将 Movie ID
和 Rating
提取到 key/value 对中作为 ex: (Movie ID,Rating )
。我浏览了 Apache spark 的文档并进行了一些冲浪。但是找不到相关的东西。如有任何建议,我们将不胜感激。
我可以使用模式匹配在样本行上完成。但不确定如何应用于输入文件的所有行以生成所需的 key/value 对 (movie Id, Rating)
.
scala> val LogEntry = """^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)""".r
LogEntry: scala.util.matching.Regex = ^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)
scala> val LogEntry(userid,movieid,rating,date) = "6 9217 2 2005-04-28 00:00:00.000"
userid: String = 6
movieid: String = 9217
rating: String = 2
date: String = 2005-04-28 00:00:00.000
你可以这样做:
val text= sc.textFile("movies.txt")
val LogEntry = """^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)""".r
val logEntries = text.flatMap(line => line match {
case LogEntry(userid,movieid,rating,date) => Some((userid.toInt,movieid.toInt,rating.toInt,date))
case _ => None
}).cache()
val movieTotalRating = logEntries.map(line => line match { case (userid,movieid,rating,date) => (movieid,rating)}).reduceByKey((rating1, rating2) => rating1 + rating2)
注意,我在 Optional
return 类型上使用 .flatMap
来删除与 RegEx 不匹配的行(如 header 行)
通过缓存 logEntries
,您可以从同一个解析数据集计算多个统计信息(如此处的 movieTotalRating)。