在 python spark (pyspark) 中将用户产品视图转换为网络 matrix/graph
Turn user product views into network matrix/graph in python spark (pyspark)
我正在处理包括用户 ID 和 products/items 这些用户查看过的网站数据。我创建了一个看起来像这样的 pyspark 数据框:
+--------+----------+-------+----------+---------+
| UserId| productA| itemB| articleC| objectD|
+--------+----------+-------+----------+---------+
| user1| 1| 1| null| null|
| user2| 1| 1| null| null|
| user3| null| 1| 1| null|
| user4| null| null| null| 1|
+--------+----------+-------+----------+---------+
其中 1 表示用户至少查看过该产品一次,null 表示用户没有查看过该产品。有数百 products/items 和数百万用户(这只是一个简化的示例)。
我想在 pyspark 中执行一个操作来获得这样的 DataFrame:
+-----------+----------+-------+----------+---------+
| | productA| itemB| articleC| objectD|
+-----------+----------+-------+----------+---------+
| productA| 2| 2| 0| 0|
| itemB| 2| 3| 1| 0|
| articleC| 0| 1| 1| 0|
| objectD| 0| 0| 0| 1|
+-----------+----------+-------+----------+---------+
这个 Dataframe 显示了用户的数量,如果他们查看了一个 product/item,还查看了另一个项目。显然,这个 Dataframe 的对角线是查看每个产品的用户数,但有趣的部分是对称的非对角线值。在这个简化的示例中,您可以看到查看产品 A 的所有用户也查看了项目 B,但是对于查看项目 B 的 3 个用户,只有 2 个用户查看了产品 A。
我创建了一个非常低效的例程来计算这个,但由于数据集的大小,它需要大约 22 小时才能完成。我如何利用 pyspark 的功能使 运行 以下的计算更快?
import numpy as np
import pandas as pd
import pyspark.sql.functions as F
# df_pivot is the name of the first Dataframe in my explanation above
columns = [c for c in df_pivot.columns]
cols = columns[1:]
net = pd.DataFrame(np.zeros((len(cols), len(cols))), index=cols, columns=cols)
for i in range(len(cols)):
c = cols[i]
cs = cols[i:]
print(f'{i + 1}: {c}')
sum_row = df_pivot.where(F.col(c).isNotNull())\
.select(*cs)\
.groupBy().sum().collect()[0]\
.asDict()
sum_row = {k.replace('sum(', '')[:-1]: v for k, v in sum_row.items()}
values = [sum_row[x] for x in cs]
net.loc[c, cs] = values
net.loc[cs, c] = values
net.head()
更新
通过与同事交谈,我们找到了一种方法(如果我们可以将数据放入 pandas DataFrame 而不会出现内存错误),方法是将数据转换为 scipy csc_matrix, and then taking the gramian像这样的矩阵:
gramian = sp_csc.transpose().dot(sp_csc)
其中 sp_csc
是 scipy“压缩稀疏列矩阵”。
将 pyspark DataFrame 强制为 pandas 似乎仍然受到数据大小的限制。在 pyspark 中,有没有更好的方法来计算 gramian(pyspark DataFrame 和 pyspark DataFrame 本身的转置点积)?
更新 2
我找到了一种使原始 code/loop 运行 更快的方法。我需要在循环之前使用 df_pivot.cache()
命令缓存 df_pivot
数据帧。由于 pyspark 的延迟计算,循环导致 pyspark 在每个循环期间重新计算所有先前的计算。虽然这解决了我快速计算这个问题的迫切需要,但我仍然对有人如何使用 parallelize
、map
和 reduce
例程在 pyspark 中执行此操作感兴趣?
IIUC,你可以 unpivot 原始数据帧 df_pivot 然后从那里做一个自我 full -outer 使用 userId
加入,然后再次执行 pivot。
from pyspark.sql import functions as F
# list of columns to do pivot
cols = df_pivot.columns[1:]
# normalize the df_pivot to userId vs target
df1 = df_pivot.select(
'userId',
F.explode(F.split(F.concat_ws('|', *[F.when(F.col(c).isNotNull(), F.lit(c)) for c in cols]),'\|')).alias('target')
)
#df1.show()
#+------+--------+
#|userId| target|
#+------+--------+
#| user1|productA|
#| user1| itemB|
#| user2|productA|
#| user2| itemB|
#| user3| itemB|
#| user3|articleC|
#| user4| objectD|
#+------+--------+
# self full-outer join
df2 = df1.join(df1.withColumnRenamed('target','target_1'),'userId','full')
# pivot
df_new = df2.groupby('target') \
.pivot('target_1', cols) \
.agg(F.countDistinct('userId')) \
.fillna(0, subset=cols)
#+--------+--------+-----+--------+-------+
#| target|productA|itemB|articleC|objectD|
#+--------+--------+-----+--------+-------+
#|productA| 2| 2| 0| 0|
#| itemB| 2| 3| 1| 0|
#|articleC| 0| 1| 1| 0|
#| objectD| 0| 0| 0| 1|
#+--------+--------+-----+--------+-------+
注意:您可能只需要F.count('*')
而不是F.countDistinct('userId')
在最终聚合中根据您的实际需求。
我正在处理包括用户 ID 和 products/items 这些用户查看过的网站数据。我创建了一个看起来像这样的 pyspark 数据框:
+--------+----------+-------+----------+---------+
| UserId| productA| itemB| articleC| objectD|
+--------+----------+-------+----------+---------+
| user1| 1| 1| null| null|
| user2| 1| 1| null| null|
| user3| null| 1| 1| null|
| user4| null| null| null| 1|
+--------+----------+-------+----------+---------+
其中 1 表示用户至少查看过该产品一次,null 表示用户没有查看过该产品。有数百 products/items 和数百万用户(这只是一个简化的示例)。
我想在 pyspark 中执行一个操作来获得这样的 DataFrame:
+-----------+----------+-------+----------+---------+
| | productA| itemB| articleC| objectD|
+-----------+----------+-------+----------+---------+
| productA| 2| 2| 0| 0|
| itemB| 2| 3| 1| 0|
| articleC| 0| 1| 1| 0|
| objectD| 0| 0| 0| 1|
+-----------+----------+-------+----------+---------+
这个 Dataframe 显示了用户的数量,如果他们查看了一个 product/item,还查看了另一个项目。显然,这个 Dataframe 的对角线是查看每个产品的用户数,但有趣的部分是对称的非对角线值。在这个简化的示例中,您可以看到查看产品 A 的所有用户也查看了项目 B,但是对于查看项目 B 的 3 个用户,只有 2 个用户查看了产品 A。
我创建了一个非常低效的例程来计算这个,但由于数据集的大小,它需要大约 22 小时才能完成。我如何利用 pyspark 的功能使 运行 以下的计算更快?
import numpy as np
import pandas as pd
import pyspark.sql.functions as F
# df_pivot is the name of the first Dataframe in my explanation above
columns = [c for c in df_pivot.columns]
cols = columns[1:]
net = pd.DataFrame(np.zeros((len(cols), len(cols))), index=cols, columns=cols)
for i in range(len(cols)):
c = cols[i]
cs = cols[i:]
print(f'{i + 1}: {c}')
sum_row = df_pivot.where(F.col(c).isNotNull())\
.select(*cs)\
.groupBy().sum().collect()[0]\
.asDict()
sum_row = {k.replace('sum(', '')[:-1]: v for k, v in sum_row.items()}
values = [sum_row[x] for x in cs]
net.loc[c, cs] = values
net.loc[cs, c] = values
net.head()
更新
通过与同事交谈,我们找到了一种方法(如果我们可以将数据放入 pandas DataFrame 而不会出现内存错误),方法是将数据转换为 scipy csc_matrix, and then taking the gramian像这样的矩阵:
gramian = sp_csc.transpose().dot(sp_csc)
其中 sp_csc
是 scipy“压缩稀疏列矩阵”。
将 pyspark DataFrame 强制为 pandas 似乎仍然受到数据大小的限制。在 pyspark 中,有没有更好的方法来计算 gramian(pyspark DataFrame 和 pyspark DataFrame 本身的转置点积)?
更新 2
我找到了一种使原始 code/loop 运行 更快的方法。我需要在循环之前使用 df_pivot.cache()
命令缓存 df_pivot
数据帧。由于 pyspark 的延迟计算,循环导致 pyspark 在每个循环期间重新计算所有先前的计算。虽然这解决了我快速计算这个问题的迫切需要,但我仍然对有人如何使用 parallelize
、map
和 reduce
例程在 pyspark 中执行此操作感兴趣?
IIUC,你可以 unpivot 原始数据帧 df_pivot 然后从那里做一个自我 full -outer 使用 userId
加入,然后再次执行 pivot。
from pyspark.sql import functions as F
# list of columns to do pivot
cols = df_pivot.columns[1:]
# normalize the df_pivot to userId vs target
df1 = df_pivot.select(
'userId',
F.explode(F.split(F.concat_ws('|', *[F.when(F.col(c).isNotNull(), F.lit(c)) for c in cols]),'\|')).alias('target')
)
#df1.show()
#+------+--------+
#|userId| target|
#+------+--------+
#| user1|productA|
#| user1| itemB|
#| user2|productA|
#| user2| itemB|
#| user3| itemB|
#| user3|articleC|
#| user4| objectD|
#+------+--------+
# self full-outer join
df2 = df1.join(df1.withColumnRenamed('target','target_1'),'userId','full')
# pivot
df_new = df2.groupby('target') \
.pivot('target_1', cols) \
.agg(F.countDistinct('userId')) \
.fillna(0, subset=cols)
#+--------+--------+-----+--------+-------+
#| target|productA|itemB|articleC|objectD|
#+--------+--------+-----+--------+-------+
#|productA| 2| 2| 0| 0|
#| itemB| 2| 3| 1| 0|
#|articleC| 0| 1| 1| 0|
#| objectD| 0| 0| 0| 1|
#+--------+--------+-----+--------+-------+
注意:您可能只需要F.count('*')
而不是F.countDistinct('userId')
在最终聚合中根据您的实际需求。