使用数据框 Scala Spark 计算余弦相似度

Calculate cosine similarity with a dataframe Scala Spark

我有一个这种形式的数据框:

+-------+-------+------------------+-------+----+
|userId1|movieId|              rat1|userId2|rat2|
+-------+-------+------------------+-------+----+
|      1|      1|               1.0|      2| 1.0|
|      1|      2|               1.0|      2| 2.0|
|      1|      3|               2.0|      2| 3.0|
|      2|      1|               1.0|      3| 0.0|
|      2|      2|               2.0|      3| 0.0|
|      2|      3|               3.0|      3| 0.0|
|      3|      1|               0.0|      1| 1.0|
|      3|      2|               0.0|      1| 1.0|

....

其中 rat1 和 rat2 是用户 1 和用户 2 的评级。 我想要的是计算两个用户之间的余弦相似度,我的想法是从此数据框中提取数组,然后计算余弦相似度,例如:

arrayUser1 = (1,1,2)
arrayUser2 = (1,2,3)
arrayUser3 = (0,0,0)

问题是我不知道如何提取这些数组,有人有解决方案吗?或者以更好的方式计算相似度的提示?

您可以使用数据框 groupBy 操作并进行 collect_set 聚合

下面是示例代码。

scala> someDF.show
+-------+-------+----+-------+----+
|userId1|movieId|rat1|userId2|rat2|
+-------+-------+----+-------+----+
|      1|      1| 1.0|      2| 1.0|
|      1|      2| 1.0|      2| 2.0|
|      1|      3| 2.0|      2| 3.0|
|      2|      1| 1.0|      3| 0.0|
|      2|      2| 2.0|      3| 0.0|
|      2|      3| 3.0|      3| 0.0|
|      3|      1| 0.0|      1| 1.0|
|      3|      2| 0.0|      1| 1.0|
+-------+-------+----+-------+----+

scala> someDF.groupBy("userId1").agg(collect_set("rat1").alias("ratinglist")).show
+-------+---------------+
|userId1|     ratinglist|
+-------+---------------+
|      1|     [2.0, 1.0]|
|      3|          [0.0]|
|      2|[2.0, 1.0, 3.0]|
+-------+---------------+

可以先将rat1和rat2相乘,再按userId1和userId2分组,求和:

df.show
+-------+-------+----+-------+----+
|userId1|movieId|rat1|userId2|rat2|
+-------+-------+----+-------+----+
|      1|      1| 1.0|      2| 1.0|
|      1|      2| 1.0|      2| 2.0|
|      1|      3| 2.0|      2| 3.0|
|      2|      1| 1.0|      3| 0.0|
|      2|      2| 2.0|      3| 0.0|
|      2|      3| 3.0|      3| 0.0|
|      3|      1| 0.0|      1| 1.0|
|      3|      2| 0.0|      1| 1.0|
|      3|      3| 0.0|      1| 2.0|
+-------+-------+----+-------+----+
val cos_sim = df.withColumn(
    "rat1",    // normalize rat1
    coalesce(
        $"rat1" / sqrt(sum($"rat1" * $"rat1").over(Window.partitionBy("userId1"))),
        lit(0)
    )
).withColumn(
    "rat2",    // normalize rat2
    coalesce(
        $"rat2" / sqrt(sum($"rat2" * $"rat2").over(Window.partitionBy("userId2"))),
        lit(0)
    )
).withColumn(
    "rat1_times_rat2",
    $"rat1" * $"rat2"
).groupBy("userId1", "userId2").agg(sum("rat1_times_rat2").alias("cos_sim"))

cos_sim.show
+-------+-------+-----------------+
|userId1|userId2|          cos_sim|
+-------+-------+-----------------+
|      3|      1|              0.0|
|      2|      3|              0.0|
|      1|      2|0.981980506061966|
+-------+-------+-----------------+