火花发现两个元素的余弦测量
Spark finding cosine measure of two elements
我有一个 .csv 文件,如下所示:
message_id, hashtag
id_1, hashtag_1
id_2, hashtag_1
...
id_k, hashtag_m
....
我试图通过执行以下操作在 csv 中的每对主题标签之间找到余弦度量:
def isHeader(s: String):Boolean = {
s.take(1) == "m"
}
def cs(pair: ( (String, String), (List[String], List[String]) ) ) = {
val msgs1 = pair._2._1.toSet
val msgs2 = pair._2._2.toSet
val numer = msgs1.intersect(msgs2).size
val denom = Math.sqrt(msgs1.size*msgs2.size)
(pair._1._1, pair._1._2, numer / denom)
}
def to_csv_cos(t: (String, String, Double)): String = {
t._1 ++ "," ++ t._2 ++ "," ++ t._3.toString
}
val messages = sc.textFile("....csv")
val msgData1 = messages.filter(x => !isHeader(x))
val data = msgData1.map(x => x.split(','))
val pairs = data.map(x => (x(0), List(x(1)))).reduceByKey((a,b) => a ++ b).flatMap(x => x._2.combinations(2).toList).map(x => (x(0), x(1)))
val msgs = data.map( x => (x(1), List(x(0)))).reduceByKey((a,b) => a++b)
val pairs_mapped = pairs.join(msgs).map{
case (x, (y,z)) => (y, (x,z))
}.join(msgs).map{
case (x, ( (y,z),t) ) => ( (x,y), (t,z) )
}
val res = pairs_mapped.map{
x => cs(x)
}.map(x => to_csv_cos(x)).saveAsTextFile("F:\Scala\result")
想法是:
- Creating pairs from the elements (pairs)
- Finding all messages for each hashtag (msgs)
- Creating pairs: ( (hashtag_i, hashtag_j), (messages_with_hashtag_i, messages_with_hashtag_j) ) (pairs_mapped)
- Calculating a measure(res)
好吧,我想我的代码只是垃圾,因为我是 scala、spark 和函数式编程概念的新手,但它适用于小型 csv(我尝试了 100 行)。但是我必须为大约 2500 万行的 csv 计算它,这就是问题所在。该过程在 saveAsTextFile(或 spark UI 中的 SparkHadoopWriter.scala)上停止,即使在 30 分钟内也不会继续,然后它会因不同的错误(内存错误,有时只是 'connection abort'.. .)
我在网站上发现,我们可以使用数据帧计算余弦度量,但我不太明白如何根据我的数据创建合适的数据帧。
所以,请问您能否给我一些建议,如何修改我的代码以使其正常工作,或者如何从 csv 或其他任何东西创建合适的数据框?
如果您能提供任何帮助,我将不胜感激!
这是机器内存不足时的典型行为。
我建议您尝试将工作量分成您的机器可以处理的批次。
你说100很容易。尝试 1000。这行得通吗?探索什么是可能的,然后相应地组织你的程序。
RDD 有惰性操作和主动操作。活动操作是 'saveToTextFile'、'persist'、'cash'、'collect'、'top'、'take'、'foreach'。惰性操作是 'map'、'filter'、'group'、'join' 等等。在您的程序中,几乎所有操作都是惰性的。惰性集合可以计算多次。您应该使用缓存,以便只计算一次。
当你使用大数据时,你应该节约使用内存。
如果你的标签或标识符是 Int,你应该使用 _.toInt。避免在未经测试的情况下将文本文件用于 non-learning 程序和目的。文本文件对于个人来说还不错,但对于个人电脑来说可能会很慢:例如,Double 占用 8 个字节,但是如果将 0.4082482904638631 写入文本文件,则此 Double 占用 18 个字符(36 字节)。另外,如果你的tag或者id是固定大小的,就可以不用逗号了。
抱歉我的英语不好。
type Id = String
type Tag = String
type Measure = Double
val messages: RDD[String] = sc.textFile("1.csv").filter(_.head != 'm')
val data: RDD[(Tag, Id)] = messages.map(line => line.split(","))
.map(pair => pair(1) -> pair(0))
val tagIds: RDD[(Tag, Set[Id])] = data.groupByKey()
.mapValues(_.toSet)
.persist(StorageLevel.MEMORY_AND_DISK)
val tagIds1TagIds2: RDD[((Tag, Set[Id]), (Tag, Set[Id]))] = tagIds.cartesian(tagIds).filter({
case ((t1,s1), (t2,s2)) => t1 < t2
})
val tagPairsWithMeasure: RDD[(Tag, Tag, Measure)] = tagIds1TagIds2.map({
case ((t1,l1), (t2,l2)) => (t1,t2, {
val numer = l1.intersect(l2).size
val denom = Math.sqrt(l1.size*l2.size)
numer.toDouble / denom
})
})
val lines: RDD[String] = tagPairsWithMeasure.map({
case (t1, t2, m) => s"$t1,$t2,$m"
})
测试:
id1,tag1
id1,tag2
id3,tag3
id3,tag2
id5,tag3
id6,tag1
id7,tag1
id8,tag2
答案:
tag2,tag3,0.4082482904638631 // 1/sqrt(3*2)
tag1,tag2,0.3333333333333333 // 1/sqrt(3*3)
tag1,tag3,0.0 // 0/sqrt(3*2)
我有一个 .csv 文件,如下所示:
message_id, hashtag
id_1, hashtag_1
id_2, hashtag_1
...
id_k, hashtag_m
....
我试图通过执行以下操作在 csv 中的每对主题标签之间找到余弦度量:
def isHeader(s: String):Boolean = {
s.take(1) == "m"
}
def cs(pair: ( (String, String), (List[String], List[String]) ) ) = {
val msgs1 = pair._2._1.toSet
val msgs2 = pair._2._2.toSet
val numer = msgs1.intersect(msgs2).size
val denom = Math.sqrt(msgs1.size*msgs2.size)
(pair._1._1, pair._1._2, numer / denom)
}
def to_csv_cos(t: (String, String, Double)): String = {
t._1 ++ "," ++ t._2 ++ "," ++ t._3.toString
}
val messages = sc.textFile("....csv")
val msgData1 = messages.filter(x => !isHeader(x))
val data = msgData1.map(x => x.split(','))
val pairs = data.map(x => (x(0), List(x(1)))).reduceByKey((a,b) => a ++ b).flatMap(x => x._2.combinations(2).toList).map(x => (x(0), x(1)))
val msgs = data.map( x => (x(1), List(x(0)))).reduceByKey((a,b) => a++b)
val pairs_mapped = pairs.join(msgs).map{
case (x, (y,z)) => (y, (x,z))
}.join(msgs).map{
case (x, ( (y,z),t) ) => ( (x,y), (t,z) )
}
val res = pairs_mapped.map{
x => cs(x)
}.map(x => to_csv_cos(x)).saveAsTextFile("F:\Scala\result")
想法是:
- Creating pairs from the elements (pairs)
- Finding all messages for each hashtag (msgs)
- Creating pairs: ( (hashtag_i, hashtag_j), (messages_with_hashtag_i, messages_with_hashtag_j) ) (pairs_mapped)
- Calculating a measure(res)
好吧,我想我的代码只是垃圾,因为我是 scala、spark 和函数式编程概念的新手,但它适用于小型 csv(我尝试了 100 行)。但是我必须为大约 2500 万行的 csv 计算它,这就是问题所在。该过程在 saveAsTextFile(或 spark UI 中的 SparkHadoopWriter.scala)上停止,即使在 30 分钟内也不会继续,然后它会因不同的错误(内存错误,有时只是 'connection abort'.. .)
我在网站上发现,我们可以使用数据帧计算余弦度量,但我不太明白如何根据我的数据创建合适的数据帧。
所以,请问您能否给我一些建议,如何修改我的代码以使其正常工作,或者如何从 csv 或其他任何东西创建合适的数据框?
如果您能提供任何帮助,我将不胜感激!
这是机器内存不足时的典型行为。
我建议您尝试将工作量分成您的机器可以处理的批次。
你说100很容易。尝试 1000。这行得通吗?探索什么是可能的,然后相应地组织你的程序。
RDD 有惰性操作和主动操作。活动操作是 'saveToTextFile'、'persist'、'cash'、'collect'、'top'、'take'、'foreach'。惰性操作是 'map'、'filter'、'group'、'join' 等等。在您的程序中,几乎所有操作都是惰性的。惰性集合可以计算多次。您应该使用缓存,以便只计算一次。
当你使用大数据时,你应该节约使用内存。 如果你的标签或标识符是 Int,你应该使用 _.toInt。避免在未经测试的情况下将文本文件用于 non-learning 程序和目的。文本文件对于个人来说还不错,但对于个人电脑来说可能会很慢:例如,Double 占用 8 个字节,但是如果将 0.4082482904638631 写入文本文件,则此 Double 占用 18 个字符(36 字节)。另外,如果你的tag或者id是固定大小的,就可以不用逗号了。
抱歉我的英语不好。
type Id = String
type Tag = String
type Measure = Double
val messages: RDD[String] = sc.textFile("1.csv").filter(_.head != 'm')
val data: RDD[(Tag, Id)] = messages.map(line => line.split(","))
.map(pair => pair(1) -> pair(0))
val tagIds: RDD[(Tag, Set[Id])] = data.groupByKey()
.mapValues(_.toSet)
.persist(StorageLevel.MEMORY_AND_DISK)
val tagIds1TagIds2: RDD[((Tag, Set[Id]), (Tag, Set[Id]))] = tagIds.cartesian(tagIds).filter({
case ((t1,s1), (t2,s2)) => t1 < t2
})
val tagPairsWithMeasure: RDD[(Tag, Tag, Measure)] = tagIds1TagIds2.map({
case ((t1,l1), (t2,l2)) => (t1,t2, {
val numer = l1.intersect(l2).size
val denom = Math.sqrt(l1.size*l2.size)
numer.toDouble / denom
})
})
val lines: RDD[String] = tagPairsWithMeasure.map({
case (t1, t2, m) => s"$t1,$t2,$m"
})
测试:
id1,tag1
id1,tag2
id3,tag3
id3,tag2
id5,tag3
id6,tag1
id7,tag1
id8,tag2
答案:
tag2,tag3,0.4082482904638631 // 1/sqrt(3*2)
tag1,tag2,0.3333333333333333 // 1/sqrt(3*3)
tag1,tag3,0.0 // 0/sqrt(3*2)