优化必须计算每个条目相似度并为每个条目输出前 N 个相似项的 Spark 作业

Optimize Spark job that has to calculate each to each entry similarity and output top N similar items for each

我有一个 Spark 作业需要计算电影 content-based 相似度。有 46k 部电影。每部电影都由一组 SparseVectors 表示(每个向量都是电影的一个字段的特征向量,例如标题、情节、流派、演员等)。例如,对于演员和流派,向量显示给定演员在电影中是出现 (1) 还是缺席 (0)。

任务是为每部电影找出前 10 部相似的电影。 我设法用 Scala 编写了一个脚本来执行所有这些计算并完成工作。它适用于较小的电影集,例如 1000 部电影,但不适用于整个数据集(内存不足等)。

我执行此计算的方法是对电影数据集使用交叉连接。然后通过只获取 movie1_id < movie2_id 的行来减少问题。 此时的数据集仍将包含 46000^2/2 行,即 1058000000。 每行都有大量数据。

然后我计算每一行的相似度分数。计算相似度后,我将 movie1_id 相同的结果分组,并使用 Window 函数获取前 N 个项目(类似于此处描述的方式: 按相似度得分降序排序) ).

问题是 - 在 Spark 中可以更有效地完成吗?例如。无需执行交叉连接?

另一个问题 - Spark 如何处理如此巨大的 Dataframes(1058000000 行由多个 SparseVectors 组成)?是否必须一次将所有这些都保存在内存中?或者它是否以某种方式逐个处理这些数据帧?


我正在使用以下函数来计算电影向量之间的相似度:

def intersectionCosine(movie1Vec: SparseVector, movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)

var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
  val index: Int = a.indexAt(offset)
  val value: Double = a.valueAt(offset)

  dot += value * b(index)
  offset += 1
}

val bReduced: BSV[Double] = new BSV(a.index, a.index.map(i => b(i)), a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)

if (maga == 0 || magb == 0)
  return 0
else
  return dot / (maga * magb)
}

Dataframe 中的每一行都包含两个连接的 类:

final case class MovieVecData(imdbID: Int,
                          Title: SparseVector,
                          Decade: SparseVector,
                          Plot: SparseVector,
                          Genres: SparseVector,
                          Actors: SparseVector,
                          Countries: SparseVector,
                          Writers: SparseVector,
                          Directors: SparseVector,
                          Productions: SparseVector,
                          Rating: Double
                         )

它可以更有效地完成,只要您对近似值没问题,并且不需要精确的结果(或精确的数字或​​结果)。

与我对 Efficient string matching in Apache Spark 的回答类似,您可以使用 LSH,其中:

如果特征space很小(或者可以合理减少)并且每个类别都比较小你也可以手动优化你的代码:

  • explode 特征数组,用于从单个记录生成#features 记录。
  • 按特征自连接结果,计算距离并过滤掉候选者(当且仅当它们共享特定的分类特征时,才会比较每对记录)。
  • 使用您当前的代码获取最高记录。

一个最小的例子是(将其视为伪代码):

import org.apache.spark.ml.linalg._

// This is oversimplified. In practice don't assume only sparse scenario
val indices = udf((v: SparseVector) => v.indices)

val df = Seq(
  (1L, Vectors.sparse(1024, Array(1, 3, 5), Array(1.0, 1.0, 1.0))),
  (2L, Vectors.sparse(1024, Array(3, 8, 12), Array(1.0, 1.0, 1.0))),
  (3L, Vectors.sparse(1024, Array(3, 5), Array(1.0, 1.0))),
  (4L, Vectors.sparse(1024, Array(11, 21), Array(1.0, 1.0))),
  (5L, Vectors.sparse(1024, Array(21, 32), Array(1.0, 1.0)))
).toDF("id", "features")

val possibleMatches = df
  .withColumn("key", explode(indices($"features")))
  .transform(df => df.alias("left").join(df.alias("right"), Seq("key")))

val closeEnough(threshold: Double) = udf((v1: SparseVector, v2: SparseVector) =>  intersectionCosine(v1, v2) > threshold)

possilbeMatches.filter(closeEnough($"left.features", $"right.features")).select($"left.id", $"right.id").distinct

请注意,只有当散列/特征足够有选择性(并且最佳稀疏)时,这两种解决方案才值得开销。在上面显示的示例中,您将只比较集合 {1, 2, 3} 和 {4, 5} 内的行,而不是集合之间的行。

然而在最坏的情况下(M条记录,N个特征)我们可以进行N M2比较,而不是M2

你可以借鉴局部敏感散列的思想。这是一种方法:

  • 根据您的匹配要求定义一组哈希键。您将使用这些键来查找潜在的匹配项。例如,一个可能的散列键可以基于电影演员向量。
  • 对每个键执行归约。这将给出一组潜在的匹配项。对于每个可能的匹配集,执行您的 "exact match"。完全匹配将产生完全匹配集。
  • 运行 连通分量算法执行集合合并以获得所有精确匹配的集合。

我已经使用上述方法实现了类似的东西。

希望对您有所帮助。

另一种可能的解决方案是使用内置 RowMatrix 和强力 columnSimilarity,如数据块上所述:

https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html

https://datascience.stackexchange.com/questions/14862/spark-item-similarity-recommendation

备注:

  • 请记住,在生成的相似性矩阵中总是有 N^2 个值
  • 您必须连接稀疏向量

另一种想法..鉴于您的矩阵相对较小且稀疏,它可以使用 breeze CSCMatrix[Int].

放入内存中

然后,您可以使用 A'B (A.transposed * B) 计算 co-occurrences,然后选择每对的 LLR(对数似然比)的 TopN。在这里,由于每行仅保留 10 个顶部项目,因此输出矩阵也将非常稀疏。

您可以在这里查看详细信息:

https://github.com/actionml/universal-recommender

我在类似场景中使用过的一个非常重要的建议是如果某部电影

relation     similarity score
A-> B        8/10
B->C         7/10
C->D         9/10

If 

E-> A       4  //less that some threshold or hyperparameter
Don't calculate similarity for
E-> B
E-> C 
E->D