用于建议新友谊的 Spark 计算
Spark computation for suggesting new friendships
我使用 Spark 是为了好玩,也是为了学习有关 MapReduce 的新知识。所以,我正在尝试编写一个建议新友谊的程序(即一种推荐系统)。如果两个人还没有连接并且有很多共同的朋友,则执行两个人之间的友谊建议。
友谊文本文件的结构类似于以下内容:
1 2,4,11,12,15
2 1,3,4,5,9,10
3 2,5,11,15,20,21
4 1,2,3
5 2,3,4,15,16
...
语法为:ID_SRC1<TAB>ID_DST1,ID_DST2,...
.
程序应该输出(打印或文本文件)如下所示:
1 3,5
3 1
5 1
...
语法为:ID_SRC1<TAB>ID_SUGG1,ID_SUGG2,...
。当然,如果两个人共享最少数量的朋友,那么程序必须建议建立友谊,在我们的例子中假设 3
。
我已经编写了我的程序,但我想阅读您提供的更好、更强大的解决方案。事实上,我认为我的代码可以改进很多,因为从 4.2 MB 的输入文件输出需要很多时间。
下面是我的代码:
from pyspark import SparkContext, SparkConf
def linesToDataset(line):
(src, dst_line) = line.split('\t')
src = int(src.strip())
dst_list_string = dst_line.split(',')
dst_list = [int(x.strip()) for x in dst_list_string if x != '']
return (src, dst_list)
def filterPairs(x):
# don't take into account pairs of a same node and pairs of already friends
if (x[0][0] != x[1][0]) and (not x[0][0] in x[1][1]) and (not x[1][0] in x[0][1]):
shared = len(list(set(x[0][1]).intersection(set(x[1][1]))))
return (x[0][0], [x[1][0], shared])
def mapFinalDataset(elem):
recommendations = []
src = elem[0]
dst_commons = elem[1]
for pair in dst_commons:
if pair[1] > 3: # 3 is the minimum number of friends in common
recommendations.append(pair[0])
return (src, recommendations)
def main():
conf = SparkConf().setAppName("Recommendation System").setMaster("local[4]")
sc = SparkContext(conf=conf)
rdd = sc.textFile("data.txt")
dataset = rdd.map(linesToDataset)
cartesian = dataset.cartesian(dataset)
filteredDatasetRaw = cartesian.map(filterPairs)
filteredDataset = filteredDatasetRaw.filter(lambda x: x != None)
# print filteredDataset.take(10)
groupedDataset = filteredDataset.groupByKey().mapValues(list)
# print groupedDataset.take(10)
finalDataset = groupedDataset.map(mapFinalDataset)
output = finalDataset.take(100)
for (k, v) in output:
if len(v) > 0:
print str(k) + ': ' + str(v)
sc.stop()
if __name__ == "__main__":
main()
更好的当然是一种观点。
我认为我即将提出的策略在性能和可读性方面更好,但这必须是主观的。主要原因是我避免了笛卡尔积,用 JOIN 代替它。
备选策略
描述
我提出的策略是基于基础数据线
1 2,4,11,12,15
可以被认为是 "friendship suggestions" 的列表,意思是这一行告诉我:“2 应该与 4, 11, 12, 15 成为朋友”,“4 应该与 2, 11, 12、15" 等等。
因此,我的实现要点是
- 将每一行变成一个建议列表(foo 应该是 bar 的朋友)
- 按人分组建议(foo 应该是 bar、baz、bar 的朋友)重复项
- 计算重复次数(foo 应该是 bar(2 个建议),baz(1 个建议)的朋友
- 删除现有关系
- 过滤很少出现的建议
- 打印结果
实施
因为我更像是一个 Java/scala 人,请原谅 scala 语言,但它应该很容易映射到 Python。
首先,从您的文本文件创建基本的友谊数据
def parseLine(line: String): (Int, Array[String]) = {
(Integer.parseInt(line.substring(0, line.indexOf("\t"))), line.substring(line.indexOf("\t")+1).split(","))
}
def toIntegerArray(strings: Array[String]): Array[Int] = {
strings.filter({ x => !x.isEmpty() }).map({ x => Integer.parseInt(x) })
}
// The friendships that exist
val alreadyFriendsRDD = sc.textFile("src/data.txt", 4)
// Parse file : (id of the person, [Int] of friends)
.map { parseLine }
.mapValues( toIntegerArray );
并将它们转化为建议
// If person 1 is friends with 2 and 4, this means we should suggest 2 to become friends with 4 , and vice versa
def toSymetricalPairs(suggestions: Array[Int]): TraversableOnce[(Int, Int)] = {
suggestions.combinations(2)
.map { suggestion => (suggestion(0), suggestion(1)) }
.flatMap { suggestion => Iterator(suggestion, (suggestion._2, suggestion._1)) }
}
val suggestionsRDD = alreadyFriendsRDD
.map( x => x._2 )
// Then we create suggestions from the friends Array
.flatMap( toSymetricalPairs )
一旦你有一个 RDD 的建议,重新组合它们:
def mergeSuggestion(suggestions: mutable.HashMap[Int, Int], newSuggestion: Int): mutable.HashMap[Int, Int] = {
suggestions.get(newSuggestion) match {
case None => suggestions.put(newSuggestion, 1)
case Some(x) => suggestions.put(newSuggestion, x + 1)
}
suggestions
}
def mergeSuggestions(suggestions: mutable.HashMap[Int, Int], toMerge: mutable.HashMap[Int, Int]) = {
val keySet = suggestions.keySet ++: toMerge.keySet
keySet.foreach { key =>
suggestions.get(key) match {
case None => suggestions.put(key, toMerge.getOrElse(key, 0))
case Some(x) => suggestions.put(key, x + toMerge.getOrElse(key, 0))
}
}
suggestions
}
def filterRareSuggestions(suggestions: mutable.HashMap[Int, Int]): scala.collection.Set[Int] = {
suggestions.filter(p => p._2 >= 3).keySet
}
// We reduce as a RDD of suggestion count by person
val suggestionsByPersonRDD = suggestionsRDD.combineByKey(
// For each person, create a map of suggestion count
(person: Int) => new mutable.HashMap[Int, Int](),
// For every suggestion, merge it into the map
mergeSuggestion ,
// When merging two maps, sum the suggestions
mergeSuggestions
)
// We restrict to suggestions that occur more than 3 times
.mapValues( filterRareSuggestions )
最终通过考虑已有的好友来过滤建议
val suggestionsCleanedRDD = suggestionsByPersonRDD
// We co-locate the suggestions with the known friends
.join(alreadyFriendsRDD)
// We clean the suggestions by removing the known friends
.mapValues (_ match { case (suggestions, alreadyKnownFriendsByPerson) => {
suggestions -- alreadyKnownFriendsByPerson
}})
输出,例如:
(49831,Set(49853, 49811, 49837, 49774))
(49835,Set(22091, 20569, 29540, 36583, 31122, 3004, 10390, 4113, 1137, 15064, 28563, 20596, 36623))
(49839,Set())
(49843,Set(49844))
意思是49831应该和49853、49811、49837、49774成为朋友。
速度
在您的数据集上进行尝试,在 2012 Corei5@2.8GHz(双核超线程)/2g RAM 上,我们在 1.5 分钟内完成。
我使用 Spark 是为了好玩,也是为了学习有关 MapReduce 的新知识。所以,我正在尝试编写一个建议新友谊的程序(即一种推荐系统)。如果两个人还没有连接并且有很多共同的朋友,则执行两个人之间的友谊建议。
友谊文本文件的结构类似于以下内容:
1 2,4,11,12,15
2 1,3,4,5,9,10
3 2,5,11,15,20,21
4 1,2,3
5 2,3,4,15,16
...
语法为:ID_SRC1<TAB>ID_DST1,ID_DST2,...
.
程序应该输出(打印或文本文件)如下所示:
1 3,5
3 1
5 1
...
语法为:ID_SRC1<TAB>ID_SUGG1,ID_SUGG2,...
。当然,如果两个人共享最少数量的朋友,那么程序必须建议建立友谊,在我们的例子中假设 3
。
我已经编写了我的程序,但我想阅读您提供的更好、更强大的解决方案。事实上,我认为我的代码可以改进很多,因为从 4.2 MB 的输入文件输出需要很多时间。
下面是我的代码:
from pyspark import SparkContext, SparkConf
def linesToDataset(line):
(src, dst_line) = line.split('\t')
src = int(src.strip())
dst_list_string = dst_line.split(',')
dst_list = [int(x.strip()) for x in dst_list_string if x != '']
return (src, dst_list)
def filterPairs(x):
# don't take into account pairs of a same node and pairs of already friends
if (x[0][0] != x[1][0]) and (not x[0][0] in x[1][1]) and (not x[1][0] in x[0][1]):
shared = len(list(set(x[0][1]).intersection(set(x[1][1]))))
return (x[0][0], [x[1][0], shared])
def mapFinalDataset(elem):
recommendations = []
src = elem[0]
dst_commons = elem[1]
for pair in dst_commons:
if pair[1] > 3: # 3 is the minimum number of friends in common
recommendations.append(pair[0])
return (src, recommendations)
def main():
conf = SparkConf().setAppName("Recommendation System").setMaster("local[4]")
sc = SparkContext(conf=conf)
rdd = sc.textFile("data.txt")
dataset = rdd.map(linesToDataset)
cartesian = dataset.cartesian(dataset)
filteredDatasetRaw = cartesian.map(filterPairs)
filteredDataset = filteredDatasetRaw.filter(lambda x: x != None)
# print filteredDataset.take(10)
groupedDataset = filteredDataset.groupByKey().mapValues(list)
# print groupedDataset.take(10)
finalDataset = groupedDataset.map(mapFinalDataset)
output = finalDataset.take(100)
for (k, v) in output:
if len(v) > 0:
print str(k) + ': ' + str(v)
sc.stop()
if __name__ == "__main__":
main()
更好的当然是一种观点。
我认为我即将提出的策略在性能和可读性方面更好,但这必须是主观的。主要原因是我避免了笛卡尔积,用 JOIN 代替它。
备选策略
描述
我提出的策略是基于基础数据线
1 2,4,11,12,15
可以被认为是 "friendship suggestions" 的列表,意思是这一行告诉我:“2 应该与 4, 11, 12, 15 成为朋友”,“4 应该与 2, 11, 12、15" 等等。
因此,我的实现要点是
- 将每一行变成一个建议列表(foo 应该是 bar 的朋友)
- 按人分组建议(foo 应该是 bar、baz、bar 的朋友)重复项
- 计算重复次数(foo 应该是 bar(2 个建议),baz(1 个建议)的朋友
- 删除现有关系
- 过滤很少出现的建议
- 打印结果
实施
因为我更像是一个 Java/scala 人,请原谅 scala 语言,但它应该很容易映射到 Python。
首先,从您的文本文件创建基本的友谊数据
def parseLine(line: String): (Int, Array[String]) = {
(Integer.parseInt(line.substring(0, line.indexOf("\t"))), line.substring(line.indexOf("\t")+1).split(","))
}
def toIntegerArray(strings: Array[String]): Array[Int] = {
strings.filter({ x => !x.isEmpty() }).map({ x => Integer.parseInt(x) })
}
// The friendships that exist
val alreadyFriendsRDD = sc.textFile("src/data.txt", 4)
// Parse file : (id of the person, [Int] of friends)
.map { parseLine }
.mapValues( toIntegerArray );
并将它们转化为建议
// If person 1 is friends with 2 and 4, this means we should suggest 2 to become friends with 4 , and vice versa
def toSymetricalPairs(suggestions: Array[Int]): TraversableOnce[(Int, Int)] = {
suggestions.combinations(2)
.map { suggestion => (suggestion(0), suggestion(1)) }
.flatMap { suggestion => Iterator(suggestion, (suggestion._2, suggestion._1)) }
}
val suggestionsRDD = alreadyFriendsRDD
.map( x => x._2 )
// Then we create suggestions from the friends Array
.flatMap( toSymetricalPairs )
一旦你有一个 RDD 的建议,重新组合它们:
def mergeSuggestion(suggestions: mutable.HashMap[Int, Int], newSuggestion: Int): mutable.HashMap[Int, Int] = {
suggestions.get(newSuggestion) match {
case None => suggestions.put(newSuggestion, 1)
case Some(x) => suggestions.put(newSuggestion, x + 1)
}
suggestions
}
def mergeSuggestions(suggestions: mutable.HashMap[Int, Int], toMerge: mutable.HashMap[Int, Int]) = {
val keySet = suggestions.keySet ++: toMerge.keySet
keySet.foreach { key =>
suggestions.get(key) match {
case None => suggestions.put(key, toMerge.getOrElse(key, 0))
case Some(x) => suggestions.put(key, x + toMerge.getOrElse(key, 0))
}
}
suggestions
}
def filterRareSuggestions(suggestions: mutable.HashMap[Int, Int]): scala.collection.Set[Int] = {
suggestions.filter(p => p._2 >= 3).keySet
}
// We reduce as a RDD of suggestion count by person
val suggestionsByPersonRDD = suggestionsRDD.combineByKey(
// For each person, create a map of suggestion count
(person: Int) => new mutable.HashMap[Int, Int](),
// For every suggestion, merge it into the map
mergeSuggestion ,
// When merging two maps, sum the suggestions
mergeSuggestions
)
// We restrict to suggestions that occur more than 3 times
.mapValues( filterRareSuggestions )
最终通过考虑已有的好友来过滤建议
val suggestionsCleanedRDD = suggestionsByPersonRDD
// We co-locate the suggestions with the known friends
.join(alreadyFriendsRDD)
// We clean the suggestions by removing the known friends
.mapValues (_ match { case (suggestions, alreadyKnownFriendsByPerson) => {
suggestions -- alreadyKnownFriendsByPerson
}})
输出,例如:
(49831,Set(49853, 49811, 49837, 49774))
(49835,Set(22091, 20569, 29540, 36583, 31122, 3004, 10390, 4113, 1137, 15064, 28563, 20596, 36623))
(49839,Set())
(49843,Set(49844))
意思是49831应该和49853、49811、49837、49774成为朋友。
速度
在您的数据集上进行尝试,在 2012 Corei5@2.8GHz(双核超线程)/2g RAM 上,我们在 1.5 分钟内完成。