用于建议新友谊的 Spark 计算

Spark computation for suggesting new friendships

我使用 Spark 是为了好玩,也是为了学习有关 MapReduce 的新知识。所以,我正在尝试编写一个建议新友谊的程序(即一种推荐系统)。如果两个人还没有连接并且有很多共同的朋友,则执行两个人之间的友谊建议。

友谊文本文件的结构类似于以下内容:

1   2,4,11,12,15
2   1,3,4,5,9,10
3   2,5,11,15,20,21
4   1,2,3
5   2,3,4,15,16
...

语法为:ID_SRC1<TAB>ID_DST1,ID_DST2,....

程序应该输出(打印或文本文件)如下所示:

1   3,5
3   1
5   1
...

语法为:ID_SRC1<TAB>ID_SUGG1,ID_SUGG2,...。当然,如果两个人共享最少数量的朋友,那么程序必须建议建立友谊,在我们的例子中假设 3

我已经编写了我的程序,但我想阅读您提供的更好、更强大的解决方案。事实上,我认为我的代码可以改进很多,因为从 4.2 MB 的输入文件输出需要很多时间。

下面是我的代码:

from pyspark import SparkContext, SparkConf

def linesToDataset(line):
    (src, dst_line) = line.split('\t')
    src = int(src.strip())

    dst_list_string = dst_line.split(',')
    dst_list = [int(x.strip()) for x in dst_list_string if x != '']

    return (src, dst_list)  

def filterPairs(x):
     # don't take into account pairs of a same node and pairs of already friends
    if (x[0][0] != x[1][0]) and (not x[0][0] in x[1][1]) and (not x[1][0] in x[0][1]):
        shared = len(list(set(x[0][1]).intersection(set(x[1][1]))))
        return (x[0][0], [x[1][0], shared])

def mapFinalDataset(elem):
    recommendations = []
    src = elem[0]
    dst_commons = elem[1]
    for pair in dst_commons:
        if pair[1] > 3: # 3 is the minimum number of friends in common
            recommendations.append(pair[0])
    return (src, recommendations)

def main():
    conf = SparkConf().setAppName("Recommendation System").setMaster("local[4]")
    sc = SparkContext(conf=conf)
    rdd = sc.textFile("data.txt")

    dataset = rdd.map(linesToDataset)

    cartesian = dataset.cartesian(dataset)
    filteredDatasetRaw = cartesian.map(filterPairs)
    filteredDataset = filteredDatasetRaw.filter(lambda x: x != None)
#   print filteredDataset.take(10)

    groupedDataset = filteredDataset.groupByKey().mapValues(list)
#   print groupedDataset.take(10)

    finalDataset = groupedDataset.map(mapFinalDataset)
    output = finalDataset.take(100)
    for (k, v) in output:
        if len(v) > 0:
            print str(k) + ': ' + str(v)

    sc.stop()


if __name__ == "__main__":
    main()

更好的当然是一种观点。

我认为我即将提出的策略在性能和可读性方面更好,但这必须是主观的。主要原因是我避免了笛卡尔积,用 JOIN 代替它。

备选策略

描述

我提出的策略是基于基础数据线

1   2,4,11,12,15

可以被认为是 "friendship suggestions" 的列表,意思是这一行告诉我:“2 应该与 4, 11, 12, 15 成为朋友”,“4 应该与 2, 11, 12、15" 等等。

因此,我的实现要点是

  1. 将每一行变成一个建议列表(foo 应该是 bar 的朋友)
  2. 按人分组建议(foo 应该是 bar、baz、bar 的朋友)重复项
  3. 计算重复次数(foo 应该是 bar(2 个建议),baz(1 个建议)的朋友
  4. 删除现有关系
  5. 过滤很少出现的建议
  6. 打印结果

实施

因为我更像是一个 Java/scala 人,请原谅 scala 语言,但它应该很容易映射到 Python。

首先,从您的文本文件创建基本的友谊数据

def parseLine(line: String): (Int, Array[String]) = {
  (Integer.parseInt(line.substring(0, line.indexOf("\t"))), line.substring(line.indexOf("\t")+1).split(","))
}
def toIntegerArray(strings: Array[String]): Array[Int] = { 
  strings.filter({ x => !x.isEmpty() }).map({ x => Integer.parseInt(x) }) 
}
// The friendships that exist
val alreadyFriendsRDD = sc.textFile("src/data.txt", 4)
        // Parse file : (id of the person, [Int] of friends)
        .map { parseLine }
        .mapValues( toIntegerArray );

并将它们转化为建议

// If person 1 is friends with 2 and 4, this means we should suggest 2 to become friends with 4 , and vice versa
def toSymetricalPairs(suggestions: Array[Int]): TraversableOnce[(Int, Int)] = {
  suggestions.combinations(2)
             .map { suggestion => (suggestion(0), suggestion(1)) }
             .flatMap { suggestion => Iterator(suggestion, (suggestion._2, suggestion._1)) }
}
val suggestionsRDD = alreadyFriendsRDD
  .map( x => x._2 )
  // Then we create suggestions from the friends Array
  .flatMap( toSymetricalPairs ) 

一旦你有一个 RDD 的建议,重新组合它们:

def mergeSuggestion(suggestions: mutable.HashMap[Int, Int], newSuggestion: Int): mutable.HashMap[Int, Int] = {
  suggestions.get(newSuggestion) match {
    case None => suggestions.put(newSuggestion, 1)
    case Some(x) => suggestions.put(newSuggestion, x + 1)
  }
  suggestions
}
def mergeSuggestions(suggestions: mutable.HashMap[Int, Int], toMerge: mutable.HashMap[Int, Int]) = {
  val keySet = suggestions.keySet ++: toMerge.keySet
  keySet.foreach { key =>
    suggestions.get(key) match {
      case None => suggestions.put(key, toMerge.getOrElse(key, 0))
      case Some(x) => suggestions.put(key, x + toMerge.getOrElse(key, 0))
    }
  }
  suggestions
}

def filterRareSuggestions(suggestions: mutable.HashMap[Int, Int]): scala.collection.Set[Int] = {
  suggestions.filter(p => p._2 >= 3).keySet
}

// We reduce as a RDD of suggestion count by person
val suggestionsByPersonRDD = suggestionsRDD.combineByKey(
    // For each person, create a map of suggestion count
    (person: Int) => new mutable.HashMap[Int, Int](),           
    // For every suggestion, merge it into the map
    mergeSuggestion , 
    // When merging two maps, sum the suggestions
    mergeSuggestions
    )
    // We restrict to suggestions that occur more than 3 times
    .mapValues( filterRareSuggestions )

最终通过考虑已有的好友来过滤建议

val suggestionsCleanedRDD = suggestionsByPersonRDD
  // We co-locate the suggestions with the known friends
  .join(alreadyFriendsRDD)
  // We clean the suggestions by removing the known friends
  .mapValues (_ match { case (suggestions, alreadyKnownFriendsByPerson) => {
    suggestions -- alreadyKnownFriendsByPerson
  }})

输出,例如:

(49831,Set(49853, 49811, 49837, 49774))
(49835,Set(22091, 20569, 29540, 36583, 31122, 3004, 10390, 4113, 1137, 15064, 28563, 20596, 36623))
(49839,Set())
(49843,Set(49844))

意思是49831应该和49853、49811、49837、49774成为朋友。

速度

在您的数据集上进行尝试,在 2012 Corei5@2.8GHz(双核超线程)/2g RAM 上,我们在 1.5 分钟内完成。