在 python spark (pyspark) 中将用户产品视图转换为网络 matrix/graph

Turn user product views into network matrix/graph in python spark (pyspark)

我正在处理包括用户 ID 和 products/items 这些用户查看过的网站数据。我创建了一个看起来像这样的 pyspark 数据框:

+--------+----------+-------+----------+---------+
|  UserId|  productA|  itemB|  articleC|  objectD|
+--------+----------+-------+----------+---------+
|   user1|         1|      1|      null|     null|
|   user2|         1|      1|      null|     null|
|   user3|      null|      1|         1|     null|
|   user4|      null|   null|      null|        1|
+--------+----------+-------+----------+---------+

其中 1 表示用户至少查看过该产品一次,null 表示用户没有查看过该产品。有数百 products/items 和数百万用户(这只是一个简化的示例)。

我想在 pyspark 中执行一个操作来获得这样的 DataFrame:

+-----------+----------+-------+----------+---------+
|           |  productA|  itemB|  articleC|  objectD|
+-----------+----------+-------+----------+---------+
|   productA|         2|      2|         0|        0|
|      itemB|         2|      3|         1|        0|
|   articleC|         0|      1|         1|        0|
|    objectD|         0|      0|         0|        1|
+-----------+----------+-------+----------+---------+

这个 Dataframe 显示了用户的数量,如果他们查看了一个 product/item,还查看了另一个项目。显然,这个 Dataframe 的对角线是查看每个产品的用户数,但有趣的部分是对称的非对角线值。在这个简化的示例中,您可以看到查看产品 A 的所有用户也查看了项目 B,但是对于查看项目 B 的 3 个用户,只有 2 个用户查看了产品 A。

我创建了一个非常低效的例程来计算这个,但由于数据集的大小,它需要大约 22 小时才能完成。我如何利用 pyspark 的功能使 运行 以下的计算更快?

import numpy as np
import pandas as pd
import pyspark.sql.functions as F

# df_pivot is the name of the first Dataframe in my explanation above
columns = [c for c in df_pivot.columns]
cols = columns[1:]
net = pd.DataFrame(np.zeros((len(cols), len(cols))), index=cols, columns=cols)

for i in range(len(cols)):
  c = cols[i]
  cs = cols[i:]
  print(f'{i + 1}: {c}')
  sum_row = df_pivot.where(F.col(c).isNotNull())\
                    .select(*cs)\
                    .groupBy().sum().collect()[0]\
                    .asDict()
  
  sum_row = {k.replace('sum(', '')[:-1]: v for k, v in sum_row.items()}
  values = [sum_row[x] for x in cs]
  net.loc[c, cs] = values
  net.loc[cs, c] = values

net.head()

更新

通过与同事交谈,我们找到了一种方法(如果我们可以将数据放入 pandas DataFrame 而不会出现内存错误),方法是将数据转换为 scipy csc_matrix, and then taking the gramian像这样的矩阵:

gramian = sp_csc.transpose().dot(sp_csc)

其中 sp_csc 是 scipy“压缩稀疏列矩阵”。

将 pyspark DataFrame 强制为 pandas 似乎仍然受到数据大小的限制。在 pyspark 中,有没有更好的方法来计算 gramian(pyspark DataFrame 和 pyspark DataFrame 本身的转置点积)?

更新 2

我找到了一种使原始 code/loop 运行 更快的方法。我需要在循环之前使用 df_pivot.cache() 命令缓存 df_pivot 数据帧。由于 pyspark 的延迟计算,循环导致 pyspark 在每个循环期间重新计算所有先前的计算。虽然这解决了我快速计算这个问题的迫切需要,但我仍然对有人如何使用 parallelizemapreduce 例程在 pyspark 中执行此操作感兴趣?

IIUC,你可以 unpivot 原始数据帧 df_pivot 然后从那里做一个自我 full -outer 使用 userId 加入,然后再次执行 pivot

from pyspark.sql import functions as F

# list of columns to do pivot
cols = df_pivot.columns[1:]

# normalize the df_pivot to userId vs target
df1 = df_pivot.select(
    'userId', 
    F.explode(F.split(F.concat_ws('|', *[F.when(F.col(c).isNotNull(), F.lit(c)) for c in cols]),'\|')).alias('target')
)
#df1.show()
#+------+--------+
#|userId|  target|
#+------+--------+
#| user1|productA|
#| user1|   itemB|
#| user2|productA|
#| user2|   itemB|
#| user3|   itemB|
#| user3|articleC|
#| user4| objectD|
#+------+--------+

# self full-outer join
df2 = df1.join(df1.withColumnRenamed('target','target_1'),'userId','full')

# pivot
df_new = df2.groupby('target') \
    .pivot('target_1', cols) \
    .agg(F.countDistinct('userId')) \
    .fillna(0, subset=cols)
#+--------+--------+-----+--------+-------+
#|  target|productA|itemB|articleC|objectD|
#+--------+--------+-----+--------+-------+
#|productA|       2|    2|       0|      0|
#|   itemB|       2|    3|       1|      0|
#|articleC|       0|    1|       1|      0|
#| objectD|       0|    0|       0|      1|
#+--------+--------+-----+--------+-------+

注意:您可能只需要F.count('*')而不是F.countDistinct('userId')在最终聚合中根据您的实际需求。