如何测量 pyspark 中一行与 matrix/table 之间的相似度得分?
How to measure similarity score between one row and matrix/table in pyspark?
我有用户偏好table:
+-------+---- -+-------+-------+--
|user_id|Action| Comedy|Fantasy|
+-------+----- +-------+-------+--
| 100 | 0 | 0.33..| 0.66..|
| 101 |0.42..| 0.15..| 0.57..|
+-------+------+-------+-------+--
和电影类型内容table:
+-------+---- -+-------+-------+--
|movieId|Action| Comedy|Fantasy|
+-------+----- +-------+-------+--
| 1001 | 1 | 1 | 0 |
| 1011 | 0 | 1 | 1 |
+-------+------+-------+-------+--
如何取用户偏好行(按他的 user_id
)和每个电影内容行的点积(相似性距离),以便按电影类型输出最优惠的movieId
?采用 RDD 或 DataFrame 格式。
这是我的尝试。
crossProduct
将每个 user_id
的数据帧与 movieId
合并,因此它将创建 # of user_id * # of movieId
数据帧的大小。
然后,您可以使用特定函数将数组的每个元素乘以 zip_with
。在这种情况下,x * y
对于 array1
的每个 x
元素和 array2
的每个 y
元素。
最后,你可以aggregate
数组的相乘结果,也就是和。从 sum = 0
开始,将 zipArray
的 x
元素添加到临时变量 sum
中,这正是通常的求和函数。
from pyspark.sql.functions import array, arrays_zip, expr, rank, desc
df1 = spark.read.option("header","true").option("inferSchema","true").csv("test1.csv")
df2 = spark.read.option("header","true").option("inferSchema","true").csv("test2.csv")
df1_cols = df1.columns
df1_cols.remove('user_id')
df2_cols = df2.columns
df2_cols.remove('movieId')
df1 = df1.withColumn('array1', array(df1_cols))
df2 = df2.withColumn('array2', array(df2_cols))
df3 = df1.crossJoin(df2)
df3.show(10, False)
+-------+------+------+-------+------------------+-------+------+------+-------+---------+
|user_id|Action|Comedy|Fantasy|array1 |movieId|Action|Comedy|Fantasy|array2 |
+-------+------+------+-------+------------------+-------+------+------+-------+---------+
|100 |0.0 |0.33 |0.66 |[0.0, 0.33, 0.66] |1001 |1 |1 |0 |[1, 1, 0]|
|100 |0.0 |0.33 |0.66 |[0.0, 0.33, 0.66] |1011 |0 |1 |1 |[0, 1, 1]|
|101 |0.42 |0.15 |0.57 |[0.42, 0.15, 0.57]|1001 |1 |1 |0 |[1, 1, 0]|
|101 |0.42 |0.15 |0.57 |[0.42, 0.15, 0.57]|1011 |0 |1 |1 |[0, 1, 1]|
+-------+------+------+-------+------------------+-------+------+------+-------+---------+
df3 = df3.withColumn('zipArray', expr("zip_with(array1, array2, (x, y) -> x * y)")) \
.withColumn('dotProduct', expr("aggregate(zipArray, 0D, (sum, x) -> sum + x)"))
df3.show(10, False)
+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+
|user_id|Action|Comedy|Fantasy|array1 |movieId|Action|Comedy|Fantasy|array2 |zipArray |dotProduct|
+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+
|100 |0.0 |0.33 |0.66 |[0.0, 0.33, 0.66] |1001 |1 |1 |0 |[1, 1, 0]|[0.0, 0.33, 0.0] |0.33 |
|100 |0.0 |0.33 |0.66 |[0.0, 0.33, 0.66] |1011 |0 |1 |1 |[0, 1, 1]|[0.0, 0.33, 0.66]|0.99 |
|101 |0.42 |0.15 |0.57 |[0.42, 0.15, 0.57]|1001 |1 |1 |0 |[1, 1, 0]|[0.42, 0.15, 0.0]|0.57 |
|101 |0.42 |0.15 |0.57 |[0.42, 0.15, 0.57]|1011 |0 |1 |1 |[0, 1, 1]|[0.0, 0.15, 0.57]|0.72 |
+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+
from pyspark.sql import Window
window = Window.partitionBy('user_id').orderBy(desc('dotProduct'))
df3.select('user_id', 'movieId', 'dotProduct') \
.withColumn('rank', rank().over(window)) \
.filter('rank = 1') \
.drop('rank') \
.show(10, False)
+-------+-------+----------+
|user_id|movieId|dotProduct|
+-------+-------+----------+
|101 |1011 |0.72 |
|100 |1011 |0.99 |
+-------+-------+----------+
我有用户偏好table:
+-------+---- -+-------+-------+--
|user_id|Action| Comedy|Fantasy|
+-------+----- +-------+-------+--
| 100 | 0 | 0.33..| 0.66..|
| 101 |0.42..| 0.15..| 0.57..|
+-------+------+-------+-------+--
和电影类型内容table:
+-------+---- -+-------+-------+--
|movieId|Action| Comedy|Fantasy|
+-------+----- +-------+-------+--
| 1001 | 1 | 1 | 0 |
| 1011 | 0 | 1 | 1 |
+-------+------+-------+-------+--
如何取用户偏好行(按他的 user_id
)和每个电影内容行的点积(相似性距离),以便按电影类型输出最优惠的movieId
?采用 RDD 或 DataFrame 格式。
这是我的尝试。
crossProduct
将每个 user_id
的数据帧与 movieId
合并,因此它将创建 # of user_id * # of movieId
数据帧的大小。
然后,您可以使用特定函数将数组的每个元素乘以 zip_with
。在这种情况下,x * y
对于 array1
的每个 x
元素和 array2
的每个 y
元素。
最后,你可以aggregate
数组的相乘结果,也就是和。从 sum = 0
开始,将 zipArray
的 x
元素添加到临时变量 sum
中,这正是通常的求和函数。
from pyspark.sql.functions import array, arrays_zip, expr, rank, desc
df1 = spark.read.option("header","true").option("inferSchema","true").csv("test1.csv")
df2 = spark.read.option("header","true").option("inferSchema","true").csv("test2.csv")
df1_cols = df1.columns
df1_cols.remove('user_id')
df2_cols = df2.columns
df2_cols.remove('movieId')
df1 = df1.withColumn('array1', array(df1_cols))
df2 = df2.withColumn('array2', array(df2_cols))
df3 = df1.crossJoin(df2)
df3.show(10, False)
+-------+------+------+-------+------------------+-------+------+------+-------+---------+
|user_id|Action|Comedy|Fantasy|array1 |movieId|Action|Comedy|Fantasy|array2 |
+-------+------+------+-------+------------------+-------+------+------+-------+---------+
|100 |0.0 |0.33 |0.66 |[0.0, 0.33, 0.66] |1001 |1 |1 |0 |[1, 1, 0]|
|100 |0.0 |0.33 |0.66 |[0.0, 0.33, 0.66] |1011 |0 |1 |1 |[0, 1, 1]|
|101 |0.42 |0.15 |0.57 |[0.42, 0.15, 0.57]|1001 |1 |1 |0 |[1, 1, 0]|
|101 |0.42 |0.15 |0.57 |[0.42, 0.15, 0.57]|1011 |0 |1 |1 |[0, 1, 1]|
+-------+------+------+-------+------------------+-------+------+------+-------+---------+
df3 = df3.withColumn('zipArray', expr("zip_with(array1, array2, (x, y) -> x * y)")) \
.withColumn('dotProduct', expr("aggregate(zipArray, 0D, (sum, x) -> sum + x)"))
df3.show(10, False)
+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+
|user_id|Action|Comedy|Fantasy|array1 |movieId|Action|Comedy|Fantasy|array2 |zipArray |dotProduct|
+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+
|100 |0.0 |0.33 |0.66 |[0.0, 0.33, 0.66] |1001 |1 |1 |0 |[1, 1, 0]|[0.0, 0.33, 0.0] |0.33 |
|100 |0.0 |0.33 |0.66 |[0.0, 0.33, 0.66] |1011 |0 |1 |1 |[0, 1, 1]|[0.0, 0.33, 0.66]|0.99 |
|101 |0.42 |0.15 |0.57 |[0.42, 0.15, 0.57]|1001 |1 |1 |0 |[1, 1, 0]|[0.42, 0.15, 0.0]|0.57 |
|101 |0.42 |0.15 |0.57 |[0.42, 0.15, 0.57]|1011 |0 |1 |1 |[0, 1, 1]|[0.0, 0.15, 0.57]|0.72 |
+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+
from pyspark.sql import Window
window = Window.partitionBy('user_id').orderBy(desc('dotProduct'))
df3.select('user_id', 'movieId', 'dotProduct') \
.withColumn('rank', rank().over(window)) \
.filter('rank = 1') \
.drop('rank') \
.show(10, False)
+-------+-------+----------+
|user_id|movieId|dotProduct|
+-------+-------+----------+
|101 |1011 |0.72 |
|100 |1011 |0.99 |
+-------+-------+----------+