将输入文件中每个记录的特定字段提取到 apache spark 中的 key/value 对中

extracting particular fields of each record from input file into key/value pairs in apache spark

以下是Apache spark 处理的输入文件。每个记录包含四个字段,如图所示。

User ID     Movie ID         Rating      Date
6             9217            2       2005-04-28 00:00:00.000
6             9254            4       2005-04-20 00:00:00.000
6             9330            4       2004-11-17 00:00:00.000
6             9330            5       2004-09-27 00:00:00.000
6             1615            4       2004-09-15 00:00:00.000
6             1659            3       2005-02-18 00:00:00.000
6             9254            4       2005-10-26 00:00:00.000
6             9217            3       2005-11-25 00:00:00.000
6             9217            3       2004-09-15 00:00:00.000

从每个记录中,我需要将 Movie IDRating 提取到 key/value 对中作为 ex: (Movie ID,Rating )。我浏览了 Apache spark 的文档并进行了一些冲浪。但是找不到相关的东西。如有任何建议,我们将不胜感激。

我可以使用模式匹配在样本行上完成。但不确定如何应用于输入文件的所有行以生成所需的 key/value 对 (movie Id, Rating).

scala> val LogEntry = """^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)""".r
LogEntry: scala.util.matching.Regex = ^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)

scala> val LogEntry(userid,movieid,rating,date) = "6             9217            2       2005-04-28 00:00:00.000"
userid: String = 6
movieid: String = 9217
rating: String = 2
date: String = 2005-04-28 00:00:00.000

你可以这样做:

val text= sc.textFile("movies.txt")
val LogEntry = """^\s*([0-9]+)\s+([0-9]+)\s+(\d{1})\s+(.*)""".r
val logEntries = text.flatMap(line => line match {
  case LogEntry(userid,movieid,rating,date) => Some((userid.toInt,movieid.toInt,rating.toInt,date))
  case _ => None
}).cache()
val movieTotalRating = logEntries.map(line => line match { case (userid,movieid,rating,date) => (movieid,rating)}).reduceByKey((rating1, rating2) => rating1 + rating2)

注意,我在 Optional return 类型上使用 .flatMap 来删除与 RegEx 不匹配的行(如 header 行)

通过缓存 logEntries,您可以从同一个解析数据集计算多个统计信息(如此处的 movieTotalRating)。