在 apache-spark RDD 中处理多个 'lines'

Process multiple 'lines' in apache-spark RDD

我是 Spark 的新手,我有一个问题。

我尝试用一​​些数据做一个简单的情绪分析。 在数据文件中,每一行都包含一个产品评论。

这是我处理一行的代码:

// wordlist
val pos_file = "/user/cloudera/Data/pos_list.txt"
val neg_file = "/user/cloudera/Data/neg_list.txt"

val pos_words = sc.textFile(pos_file).cache().collect().toSet
val neg_words = sc.textFile(neg_file).cache().collect().toSet


val test_string = "Line with positive or negative review."
val test_rdd = sc.parallelize(List(test_string))

val test_rdd2 = test_rdd.flatMap(line => "[a-zA-Z]+".r findAllIn line map (_.toLowerCase) )

val pos = test_rdd2.filter(x => pos_words contains x)
val neg = test_rdd2.filter(x => neg_words contains x)

我现在的问题是如何处理rdd中的每条记录(在本例中为3):

val file_in = "/user/cloudera/Data/teststring.txt"
val data = sc.textFile(file_in).cache()
val reviews = data.flatMap(_.split("\n"))

scala> reviews.count()
res29: Long = 3

以下代码

val reviews2 = reviews.flatMap(line => "[a-zA-Z]+".r findAllIn line map (_.toLowerCase))

给我所有的话。 我想获得每个 line/review 的 pos 和 neg 的值。 计算非常简单:如果一个词在 pos_words/neg_words 的集合中,则将其放入 pos/neg 中。其实我只是统计肯定词或否定词的出现次数。

如何获得类似 ('line'、'posvalue'、'negvalue') 的内容?

非常感谢

试图总结这个问题:我们想计算输入文件每一行中特定单词(正面、负面)出现的次数 'normalized':所有字母字符和空格均为小写。

假设我已经有了两组正面和负面的词:

val posWords: Set[String] = ???
val negWords: Set[String] = ???

以及每行 1 条记录的输入文件:

val data = sc.textFile("data.txt")

我们想要以下形式的结果:

(text, posCount, negCount)

让我们首先定义一个辅助函数,让我们计算字符串序列中的单词与一组字符串的匹配次数:

def matches(text:Seq[String], words:Set[String]):Int =
text.map(w => if (words.contains(w)) 1 else 0).sum 

最后,我们将每一行转换为正匹配和负匹配的计数。

val posNegData  = data.map{line => 
    val cleanLine = line.toLowerCase.split("\W")
    (line, matches(cleanLine, posWords), matches(cleanLine, negWords))
}

这里我们假设预期结果是正负匹配的原始字符串。 (这个从原题看不清楚)