如果给定用户和项目的嵌入,如何获得所有用户和 PySpark 中所有项目的余弦相似度分数?

How to get cosine similarity scores for all users and all the items in PySpark, if user's and item's embeddings are given?

我有一个用户 df-

df1 = spark.createDataFrame([
    ("u1", [0., 2., 3.]),
    ("u2", [1., 0., 0.]),
    ("u3", [0., 0., 3.]),
    ],
    ['user_id', 'features'])

print(df1.printSchema())
df1.show(truncate=False)

输出-

root
 |-- user_id: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = true)

None
+-------+---------------+
|user_id|features       |
+-------+---------------+
|u1     |[0.0, 2.0, 3.0]|
|u2     |[1.0, 0.0, 0.0]|
|u3     |[0.0, 0.0, 3.0]|
+-------+---------------+

我有一个项目 df-

df2 = spark.createDataFrame([
    ("i1", [0., 2., 3.]),
    ("i2", [1.1, 0., 0.]),
    ("i3", [0., 0., 3.1]),
    ],
    ['item_id', 'features'])

print(df2.printSchema())
df2.show(truncate=False)

输出-

root
 |-- item_id: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = true)

None
+-------+---------------+
|item_id|features       |
+-------+---------------+
|i1     |[0.0, 2.0, 3.0]|
|i2     |[1.1, 0.0, 0.0]|
|i3     |[0.0, 0.0, 3.1]|
+-------+---------------+

如何计算所有用户-项目对的余弦相似度分数,以便我可以轻松地为每个用户对项目进行排名?

最终数据框应该类似于-

+-------+-------+-----------------+
|user_id|item_id|cosine_similarity|
+-------+-------+-----------------+
|u1     |     i1|      some number|
|u1     |     i2|      some number|
|u1     |     i3|      some number|
|u2     |     i1|      some number|
|u2     |     i2|      some number|
|u2     |     i3|      some number|
|u3     |     i1|      some number|
|u3     |     i2|      some number|
|u3     |     i3|      some number|
+-------+-------+-----------------+

余弦相似度的手动实现:

import pyspark.sql.functions as F

size = df1.limit(1).select(F.size('features')).first()[0]
joined = df1.crossJoin(df2.withColumnRenamed('features', 'features2'))
result = joined.select(
    'user_id',
    'item_id',
    sum([F.col('features')[i] * F.col('features2')[i] for i in range(size)]).alias('dot_product'),
    F.sqrt(sum([F.col('features')[i] * F.col('features')[i] for i in range(size)])).alias('norm1'),
    F.sqrt(sum([F.col('features2')[i] * F.col('features2')[i] for i in range(size)])).alias('norm2')
).selectExpr(
    'user_id',
    'item_id',
    'dot_product / norm1 / norm2 as cosine_similarity'
)

result.show()
+-------+-------+------------------+
|user_id|item_id| cosine_similarity|
+-------+-------+------------------+
|     u1|     i1|1.0000000000000002|
|     u1|     i2|               0.0|
|     u1|     i3|0.8320502943378437|
|     u2|     i1|               0.0|
|     u2|     i2|               1.0|
|     u2|     i3|               0.0|
|     u3|     i1|0.8320502943378437|
|     u3|     i2|               0.0|
|     u3|     i3|               1.0|
+-------+-------+------------------+

下面是使用 sklearn 和底层 RDD 的方法:

from pyspark.sql import functions as F
from sklearn.metrics.pairwise import cosine_similarity

# Join DFs
df = df1.crossJoin(df2.select('item_id', F.col("features").alias("features_item")))

# Get cosine similarity
result = df.rdd.map(lambda x: (x['user_id'], x['item_id'],
                               float(
                                   cosine_similarity(
                                       [x['features']],
                                       [x['features_item']]
                                   )[0,0]
                               )
                              )
                   ).toDF(schema=['user_id', 'item_id', 'cosine_similarity'])