如何测量 pyspark 中一行与 matrix/table 之间的相似度得分?

How to measure similarity score between one row and matrix/table in pyspark?

我有用户偏好table:

+-------+---- -+-------+-------+--
|user_id|Action| Comedy|Fantasy|
+-------+----- +-------+-------+--
|   100 |  0   | 0.33..| 0.66..|
|   101 |0.42..| 0.15..| 0.57..|
+-------+------+-------+-------+--

和电影类型内容table:

+-------+---- -+-------+-------+--
|movieId|Action| Comedy|Fantasy|
+-------+----- +-------+-------+--
|  1001 |  1   |   1   |   0   |
|  1011 |  0   |   1   |   1   |
+-------+------+-------+-------+--

如何取用户偏好行(按他的 user_id)和每个电影内容行的点积(相似性距离),以便按电影类型输出最优惠的movieId?采用 RDD 或 DataFrame 格式。

这是我的尝试。

crossProduct 将每个 user_id 的数据帧与 movieId 合并,因此它将创建 # of user_id * # of movieId 数据帧的大小。

然后,您可以使用特定函数将数组的每个元素乘以 zip_with。在这种情况下,x * y 对于 array1 的每个 x 元素和 array2 的每个 y 元素。

最后,你可以aggregate数组的相乘结果,也就是和。从 sum = 0 开始,将 zipArrayx 元素添加到临时变量 sum 中,这正是通常的求和函数。

from pyspark.sql.functions import array, arrays_zip, expr, rank, desc

df1 = spark.read.option("header","true").option("inferSchema","true").csv("test1.csv")
df2 = spark.read.option("header","true").option("inferSchema","true").csv("test2.csv")

df1_cols = df1.columns
df1_cols.remove('user_id')
df2_cols = df2.columns
df2_cols.remove('movieId')


df1 = df1.withColumn('array1', array(df1_cols))
df2 = df2.withColumn('array2', array(df2_cols))

df3 = df1.crossJoin(df2)
df3.show(10, False)

+-------+------+------+-------+------------------+-------+------+------+-------+---------+
|user_id|Action|Comedy|Fantasy|array1            |movieId|Action|Comedy|Fantasy|array2   |
+-------+------+------+-------+------------------+-------+------+------+-------+---------+
|100    |0.0   |0.33  |0.66   |[0.0, 0.33, 0.66] |1001   |1     |1     |0      |[1, 1, 0]|
|100    |0.0   |0.33  |0.66   |[0.0, 0.33, 0.66] |1011   |0     |1     |1      |[0, 1, 1]|
|101    |0.42  |0.15  |0.57   |[0.42, 0.15, 0.57]|1001   |1     |1     |0      |[1, 1, 0]|
|101    |0.42  |0.15  |0.57   |[0.42, 0.15, 0.57]|1011   |0     |1     |1      |[0, 1, 1]|
+-------+------+------+-------+------------------+-------+------+------+-------+---------+


df3 = df3.withColumn('zipArray',   expr("zip_with(array1, array2, (x, y) -> x * y)")) \
         .withColumn('dotProduct', expr("aggregate(zipArray, 0D, (sum, x) -> sum + x)"))
                     
df3.show(10, False)

+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+
|user_id|Action|Comedy|Fantasy|array1            |movieId|Action|Comedy|Fantasy|array2   |zipArray         |dotProduct|
+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+
|100    |0.0   |0.33  |0.66   |[0.0, 0.33, 0.66] |1001   |1     |1     |0      |[1, 1, 0]|[0.0, 0.33, 0.0] |0.33      |
|100    |0.0   |0.33  |0.66   |[0.0, 0.33, 0.66] |1011   |0     |1     |1      |[0, 1, 1]|[0.0, 0.33, 0.66]|0.99      |
|101    |0.42  |0.15  |0.57   |[0.42, 0.15, 0.57]|1001   |1     |1     |0      |[1, 1, 0]|[0.42, 0.15, 0.0]|0.57      |
|101    |0.42  |0.15  |0.57   |[0.42, 0.15, 0.57]|1011   |0     |1     |1      |[0, 1, 1]|[0.0, 0.15, 0.57]|0.72      |
+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+


from pyspark.sql import Window

window = Window.partitionBy('user_id').orderBy(desc('dotProduct'))

df3.select('user_id', 'movieId', 'dotProduct') \
   .withColumn('rank', rank().over(window)) \
   .filter('rank = 1') \
   .drop('rank') \
   .show(10, False)

+-------+-------+----------+
|user_id|movieId|dotProduct|
+-------+-------+----------+
|101    |1011   |0.72      |
|100    |1011   |0.99      |
+-------+-------+----------+