Apache Spark - 推文处理
Apache Spark - Tweets Processing
鉴于我需要的大量推文数据集:
- 提取并计算主题标签。
- 提取并计算 emoticons/emojis。
- 提取并计算单词(引理)
所以,我首先想到的是做这样的事情:
val tweets = sparkContext.textFile(DATASET).cache
val hashtags = tweets
.map(extractHashTags)
.map((_, 1))
.reduceByKey(_ + _)
val emoticonsEmojis = tweets
.map(extractEmoticonsEmojis)
.map((_, 1))
.reduceByKey(_ + _)
val lemmas = tweets
.map(extractLemmas)
.map((_, 1))
.reduceByKey(_ + _)
但是这样每条推文都处理了3次,对不对?如果是这样,是否有一种有效的方法通过只处理每条推文一次来单独计算所有这些元素?
我在想这样的事情:
sparkContext.textFile(DATASET)
.map(extractor) // RDD[(List[String], List[String], List[String])]
但这样一来就变成了噩梦。也因为一旦我计算了单词数(我指的是请求的第三点),我需要与另一个 RDD 进行连接,这在第一个版本中非常简单,而在第二个版本中则不是。
使用数据集API:
val tweets = sparkContext.textFile(DATASET)
val tokens = tweets.flatMap(extractor) //return RDD[(String, String)]
.toDF("type", "token") //type is one of ("hashtag", "emoticon", "lemma")
.groupBy("type", "token")
.count() //Dataset[Row] which has columns ("type", "token", "count")
val lemmas = tokens
.where($"type" === lit("lemma"))
.select("token", "count")
.as[(String, Long)]
.rdd //should be the same type as your original 'lemmas', for future join
也许是这样的?
sealed trait TokenType { }
object Hashtag extends TokenType
object Emoji extends TokenType
object Word extends TokenType
def extractTokens(tweet: String): Seq[(TokenType, String)] = {
...
}
val tokenCounts = tweets
.flatMap(extractTokens)
.map((_, 1))
.reduceByKey(_ + _)
val hashtagCounts = tokenCounts.collect { case ((Hashtag, x), count) => (x, count) }
// similar for emojis and words
鉴于我需要的大量推文数据集:
- 提取并计算主题标签。
- 提取并计算 emoticons/emojis。
- 提取并计算单词(引理)
所以,我首先想到的是做这样的事情:
val tweets = sparkContext.textFile(DATASET).cache
val hashtags = tweets
.map(extractHashTags)
.map((_, 1))
.reduceByKey(_ + _)
val emoticonsEmojis = tweets
.map(extractEmoticonsEmojis)
.map((_, 1))
.reduceByKey(_ + _)
val lemmas = tweets
.map(extractLemmas)
.map((_, 1))
.reduceByKey(_ + _)
但是这样每条推文都处理了3次,对不对?如果是这样,是否有一种有效的方法通过只处理每条推文一次来单独计算所有这些元素?
我在想这样的事情:
sparkContext.textFile(DATASET)
.map(extractor) // RDD[(List[String], List[String], List[String])]
但这样一来就变成了噩梦。也因为一旦我计算了单词数(我指的是请求的第三点),我需要与另一个 RDD 进行连接,这在第一个版本中非常简单,而在第二个版本中则不是。
使用数据集API:
val tweets = sparkContext.textFile(DATASET)
val tokens = tweets.flatMap(extractor) //return RDD[(String, String)]
.toDF("type", "token") //type is one of ("hashtag", "emoticon", "lemma")
.groupBy("type", "token")
.count() //Dataset[Row] which has columns ("type", "token", "count")
val lemmas = tokens
.where($"type" === lit("lemma"))
.select("token", "count")
.as[(String, Long)]
.rdd //should be the same type as your original 'lemmas', for future join
也许是这样的?
sealed trait TokenType { }
object Hashtag extends TokenType
object Emoji extends TokenType
object Word extends TokenType
def extractTokens(tweet: String): Seq[(TokenType, String)] = {
...
}
val tokenCounts = tweets
.flatMap(extractTokens)
.map((_, 1))
.reduceByKey(_ + _)
val hashtagCounts = tokenCounts.collect { case ((Hashtag, x), count) => (x, count) }
// similar for emojis and words