优化必须计算每个条目相似度并为每个条目输出前 N 个相似项的 Spark 作业
Optimize Spark job that has to calculate each to each entry similarity and output top N similar items for each
我有一个 Spark 作业需要计算电影 content-based 相似度。有 46k 部电影。每部电影都由一组 SparseVectors 表示(每个向量都是电影的一个字段的特征向量,例如标题、情节、流派、演员等)。例如,对于演员和流派,向量显示给定演员在电影中是出现 (1) 还是缺席 (0)。
任务是为每部电影找出前 10 部相似的电影。
我设法用 Scala 编写了一个脚本来执行所有这些计算并完成工作。它适用于较小的电影集,例如 1000 部电影,但不适用于整个数据集(内存不足等)。
我执行此计算的方法是对电影数据集使用交叉连接。然后通过只获取 movie1_id < movie2_id 的行来减少问题。
此时的数据集仍将包含 46000^2/2 行,即 1058000000。
每行都有大量数据。
然后我计算每一行的相似度分数。计算相似度后,我将 movie1_id 相同的结果分组,并使用 Window 函数获取前 N 个项目(类似于此处描述的方式: 按相似度得分降序排序) ).
问题是 - 在 Spark 中可以更有效地完成吗?例如。无需执行交叉连接?
另一个问题 - Spark 如何处理如此巨大的 Dataframes(1058000000 行由多个 SparseVectors 组成)?是否必须一次将所有这些都保存在内存中?或者它是否以某种方式逐个处理这些数据帧?
我正在使用以下函数来计算电影向量之间的相似度:
def intersectionCosine(movie1Vec: SparseVector, movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)
var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
val index: Int = a.indexAt(offset)
val value: Double = a.valueAt(offset)
dot += value * b(index)
offset += 1
}
val bReduced: BSV[Double] = new BSV(a.index, a.index.map(i => b(i)), a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)
if (maga == 0 || magb == 0)
return 0
else
return dot / (maga * magb)
}
Dataframe 中的每一行都包含两个连接的 类:
final case class MovieVecData(imdbID: Int,
Title: SparseVector,
Decade: SparseVector,
Plot: SparseVector,
Genres: SparseVector,
Actors: SparseVector,
Countries: SparseVector,
Writers: SparseVector,
Directors: SparseVector,
Productions: SparseVector,
Rating: Double
)
它可以更有效地完成,只要您对近似值没问题,并且不需要精确的结果(或精确的数字或结果)。
与我对 Efficient string matching in Apache Spark 的回答类似,您可以使用 LSH,其中:
BucketedRandomProjectionLSH
近似欧氏距离。
MinHashLSH
近似 Jaccard 距离。
如果特征space很小(或者可以合理减少)并且每个类别都比较小你也可以手动优化你的代码:
explode
特征数组,用于从单个记录生成#features 记录。
- 按特征自连接结果,计算距离并过滤掉候选者(当且仅当它们共享特定的分类特征时,才会比较每对记录)。
- 使用您当前的代码获取最高记录。
一个最小的例子是(将其视为伪代码):
import org.apache.spark.ml.linalg._
// This is oversimplified. In practice don't assume only sparse scenario
val indices = udf((v: SparseVector) => v.indices)
val df = Seq(
(1L, Vectors.sparse(1024, Array(1, 3, 5), Array(1.0, 1.0, 1.0))),
(2L, Vectors.sparse(1024, Array(3, 8, 12), Array(1.0, 1.0, 1.0))),
(3L, Vectors.sparse(1024, Array(3, 5), Array(1.0, 1.0))),
(4L, Vectors.sparse(1024, Array(11, 21), Array(1.0, 1.0))),
(5L, Vectors.sparse(1024, Array(21, 32), Array(1.0, 1.0)))
).toDF("id", "features")
val possibleMatches = df
.withColumn("key", explode(indices($"features")))
.transform(df => df.alias("left").join(df.alias("right"), Seq("key")))
val closeEnough(threshold: Double) = udf((v1: SparseVector, v2: SparseVector) => intersectionCosine(v1, v2) > threshold)
possilbeMatches.filter(closeEnough($"left.features", $"right.features")).select($"left.id", $"right.id").distinct
请注意,只有当散列/特征足够有选择性(并且最佳稀疏)时,这两种解决方案才值得开销。在上面显示的示例中,您将只比较集合 {1, 2, 3} 和 {4, 5} 内的行,而不是集合之间的行。
然而在最坏的情况下(M条记录,N个特征)我们可以进行N M2比较,而不是M2
你可以借鉴局部敏感散列的思想。这是一种方法:
- 根据您的匹配要求定义一组哈希键。您将使用这些键来查找潜在的匹配项。例如,一个可能的散列键可以基于电影演员向量。
- 对每个键执行归约。这将给出一组潜在的匹配项。对于每个可能的匹配集,执行您的 "exact match"。完全匹配将产生完全匹配集。
- 运行 连通分量算法执行集合合并以获得所有精确匹配的集合。
我已经使用上述方法实现了类似的东西。
希望对您有所帮助。
另一种可能的解决方案是使用内置 RowMatrix 和强力 columnSimilarity,如数据块上所述:
https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html
https://datascience.stackexchange.com/questions/14862/spark-item-similarity-recommendation
备注:
- 请记住,在生成的相似性矩阵中总是有 N^2 个值
- 您必须连接稀疏向量
另一种想法..鉴于您的矩阵相对较小且稀疏,它可以使用 breeze CSCMatrix[Int].
放入内存中
然后,您可以使用 A'B (A.transposed * B) 计算 co-occurrences,然后选择每对的 LLR(对数似然比)的 TopN。在这里,由于每行仅保留 10 个顶部项目,因此输出矩阵也将非常稀疏。
您可以在这里查看详细信息:
我在类似场景中使用过的一个非常重要的建议是如果某部电影
relation similarity score
A-> B 8/10
B->C 7/10
C->D 9/10
If
E-> A 4 //less that some threshold or hyperparameter
Don't calculate similarity for
E-> B
E-> C
E->D
我有一个 Spark 作业需要计算电影 content-based 相似度。有 46k 部电影。每部电影都由一组 SparseVectors 表示(每个向量都是电影的一个字段的特征向量,例如标题、情节、流派、演员等)。例如,对于演员和流派,向量显示给定演员在电影中是出现 (1) 还是缺席 (0)。
任务是为每部电影找出前 10 部相似的电影。 我设法用 Scala 编写了一个脚本来执行所有这些计算并完成工作。它适用于较小的电影集,例如 1000 部电影,但不适用于整个数据集(内存不足等)。
我执行此计算的方法是对电影数据集使用交叉连接。然后通过只获取 movie1_id < movie2_id 的行来减少问题。 此时的数据集仍将包含 46000^2/2 行,即 1058000000。 每行都有大量数据。
然后我计算每一行的相似度分数。计算相似度后,我将 movie1_id 相同的结果分组,并使用 Window 函数获取前 N 个项目(类似于此处描述的方式:
问题是 - 在 Spark 中可以更有效地完成吗?例如。无需执行交叉连接?
另一个问题 - Spark 如何处理如此巨大的 Dataframes(1058000000 行由多个 SparseVectors 组成)?是否必须一次将所有这些都保存在内存中?或者它是否以某种方式逐个处理这些数据帧?
我正在使用以下函数来计算电影向量之间的相似度:
def intersectionCosine(movie1Vec: SparseVector, movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)
var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
val index: Int = a.indexAt(offset)
val value: Double = a.valueAt(offset)
dot += value * b(index)
offset += 1
}
val bReduced: BSV[Double] = new BSV(a.index, a.index.map(i => b(i)), a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)
if (maga == 0 || magb == 0)
return 0
else
return dot / (maga * magb)
}
Dataframe 中的每一行都包含两个连接的 类:
final case class MovieVecData(imdbID: Int,
Title: SparseVector,
Decade: SparseVector,
Plot: SparseVector,
Genres: SparseVector,
Actors: SparseVector,
Countries: SparseVector,
Writers: SparseVector,
Directors: SparseVector,
Productions: SparseVector,
Rating: Double
)
它可以更有效地完成,只要您对近似值没问题,并且不需要精确的结果(或精确的数字或结果)。
与我对 Efficient string matching in Apache Spark 的回答类似,您可以使用 LSH,其中:
BucketedRandomProjectionLSH
近似欧氏距离。MinHashLSH
近似 Jaccard 距离。
如果特征space很小(或者可以合理减少)并且每个类别都比较小你也可以手动优化你的代码:
explode
特征数组,用于从单个记录生成#features 记录。- 按特征自连接结果,计算距离并过滤掉候选者(当且仅当它们共享特定的分类特征时,才会比较每对记录)。
- 使用您当前的代码获取最高记录。
一个最小的例子是(将其视为伪代码):
import org.apache.spark.ml.linalg._
// This is oversimplified. In practice don't assume only sparse scenario
val indices = udf((v: SparseVector) => v.indices)
val df = Seq(
(1L, Vectors.sparse(1024, Array(1, 3, 5), Array(1.0, 1.0, 1.0))),
(2L, Vectors.sparse(1024, Array(3, 8, 12), Array(1.0, 1.0, 1.0))),
(3L, Vectors.sparse(1024, Array(3, 5), Array(1.0, 1.0))),
(4L, Vectors.sparse(1024, Array(11, 21), Array(1.0, 1.0))),
(5L, Vectors.sparse(1024, Array(21, 32), Array(1.0, 1.0)))
).toDF("id", "features")
val possibleMatches = df
.withColumn("key", explode(indices($"features")))
.transform(df => df.alias("left").join(df.alias("right"), Seq("key")))
val closeEnough(threshold: Double) = udf((v1: SparseVector, v2: SparseVector) => intersectionCosine(v1, v2) > threshold)
possilbeMatches.filter(closeEnough($"left.features", $"right.features")).select($"left.id", $"right.id").distinct
请注意,只有当散列/特征足够有选择性(并且最佳稀疏)时,这两种解决方案才值得开销。在上面显示的示例中,您将只比较集合 {1, 2, 3} 和 {4, 5} 内的行,而不是集合之间的行。
然而在最坏的情况下(M条记录,N个特征)我们可以进行N M2比较,而不是M2
你可以借鉴局部敏感散列的思想。这是一种方法:
- 根据您的匹配要求定义一组哈希键。您将使用这些键来查找潜在的匹配项。例如,一个可能的散列键可以基于电影演员向量。
- 对每个键执行归约。这将给出一组潜在的匹配项。对于每个可能的匹配集,执行您的 "exact match"。完全匹配将产生完全匹配集。
- 运行 连通分量算法执行集合合并以获得所有精确匹配的集合。
我已经使用上述方法实现了类似的东西。
希望对您有所帮助。
另一种可能的解决方案是使用内置 RowMatrix 和强力 columnSimilarity,如数据块上所述:
https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html
https://datascience.stackexchange.com/questions/14862/spark-item-similarity-recommendation
备注:
- 请记住,在生成的相似性矩阵中总是有 N^2 个值
- 您必须连接稀疏向量
另一种想法..鉴于您的矩阵相对较小且稀疏,它可以使用 breeze CSCMatrix[Int].
放入内存中然后,您可以使用 A'B (A.transposed * B) 计算 co-occurrences,然后选择每对的 LLR(对数似然比)的 TopN。在这里,由于每行仅保留 10 个顶部项目,因此输出矩阵也将非常稀疏。
您可以在这里查看详细信息:
我在类似场景中使用过的一个非常重要的建议是如果某部电影
relation similarity score
A-> B 8/10
B->C 7/10
C->D 9/10
If
E-> A 4 //less that some threshold or hyperparameter
Don't calculate similarity for
E-> B
E-> C
E->D