在 Pyspark 中创建项目-项目交互矩阵

Create item- item Interaction Matrix in Pyspark

我有一个包含两列的数据集,user_id 和 item_id。 DataFrame 如下所示:

index user_id item_id
0     user1   A
1     user1   B
2     user2   A
3     user3   B
4     user4   C

我正在寻找一种方法将此 table 转换为项目-项目交互矩阵,其中我们在项目之间有 普通用户 的不同交集:

       A   B   C
A      2   1   0
B      1   2   0
C      0   0   1

还有另一个项目-项目交互矩阵,我们在项目之间有 不同的用户联合

       A   B   C
A      2   3   3
B      3   2   3
C      3   3   1

第 0 步。定义数据框

import pyspark.sql.functions as F

data = [(0,     "user1",   "A"),
        (1,     "user1",   "B"),
        (2,     "user2",   "A"),
        (3,     "user3",   "B"),
        (4,     "user4",   "C")]
df = spark.createDataFrame(data, schema=["index", "user_id", "item_id"])

第 1 步。为 df_collect

中的每个项目收集用户数据
df_collect = (df
        .select("user_id", "item_id")
        .groupBy("item_id")
        .agg(F.collect_set("user_id").alias("users")))

第 2 步。交叉连接 df_collect 与自身以获得所有项-项组合

df_crossjoin = (df_collect
                    .join(df_collect
                            .withColumnRenamed("item_id", "item_y")
                            .withColumnRenamed("users", "users_y")))

步骤 2. 查找用户并集和交集以及计数

df_ui = (df_crossjoin
                 .withColumn("users_union", 
                         F.size((F.array_union("users", "users_y"))))
                 .withColumn("users_intersect", 
                             F.size(F.array_intersect("users", "users_y"))))
    

第 3 步。旋转以获得项-项矩阵

df_matrix_union = (df_ui
                   .groupBy("item_id")
                   .pivot("item_y")
                   .agg(F.first("users_union"))
                   .orderBy("item_id"))

df_matrix_intrsct = (df_ui
                   .groupBy("item_id")
                   .pivot("item_y")
                   .agg(F.first("users_intersect"))
                   .orderBy("item_id"))