pyspark 大矩阵运算
pyspark large matrix operations
我有一个矩阵 A,有 900 万行和 80K 列。矩阵本身非常稀疏。现在,我想得到比率
ratio = (A.T x A)/(n-((1-A).T x (1-A)))
假设用户是我的行,电影是列,我想计算看过两部电影(m1 或 m2)的人,他们看过两部电影(m1 和 m2)的百分比是多少。
我通过将 A 转换为 rdd 找到了第一部分 (A.T x A) 的解决方案。
def coordinateMatrixMultiply(leftmatrix, rightmatrix):
left = leftmatrix.map(lambda (i, j, v): (j, (i, v)))
right = rightmatrix.map(lambda (j, k, w): (j, (k, w)))
productEntries = left \
.join(right) \
.map(lambda (x, ((i, v), (k, w))): ((i, k), (v * w))) \
.reduceByKey(lambda x,y: x+y) \
.map(lambda ((i, k), sum): (i, k, sum))
return productEntries
但是,我发现分母 (1-A).T x (1-A) 由于大小而更难求解。由于内存的原因,尝试创建一个密集的矩阵是根本不可能的。我也尝试过使用 union all 将大量数据帧连接在一起,但速度很慢。有一个更好的方法吗?谢谢!
您不需要矩阵乘积来进行此类计算:
首先让我们创建一个 0 和 1 的示例数据框:
import numpy as np
import pyspark.sql.funtions as psf
from pyspark.sql import Window
df = spark.createDataFrame(
sc.parallelize([["id" + str(n)] + np.random.randint(0, 2, 10).tolist() for n in range(100)]),
["id"] + ["movie" + str(i) for i in range(10)])
df.show()
+----+------+------+------+------+------+------+------+------+------+------+
| id|movie0|movie1|movie2|movie3|movie4|movie5|movie6|movie7|movie8|movie9|
+----+------+------+------+------+------+------+------+------+------+------+
| id0| 1| 0| 1| 1| 1| 0| 0| 0| 0| 1|
| id1| 0| 1| 1| 0| 0| 0| 1| 0| 1| 0|
| id2| 1| 1| 1| 0| 0| 1| 1| 0| 0| 1|
| id3| 1| 0| 0| 1| 0| 1| 0| 0| 0| 1|
| id4| 0| 0| 0| 1| 1| 0| 0| 1| 0| 1|
| id5| 1| 1| 1| 0| 1| 0| 0| 0| 1| 0|
| id6| 0| 1| 1| 0| 1| 0| 0| 1| 0| 1|
| id7| 1| 1| 1| 1| 0| 1| 1| 0| 1| 0|
| id8| 0| 1| 0| 0| 1| 1| 1| 0| 1| 1|
| id9| 1| 0| 1| 0| 0| 1| 1| 0| 1| 0|
|id10| 1| 0| 1| 1| 0| 1| 0| 1| 0| 0|
|id11| 1| 0| 0| 0| 0| 0| 1| 1| 0| 0|
|id12| 0| 1| 0| 0| 1| 1| 1| 0| 1| 0|
|id13| 0| 0| 1| 1| 1| 1| 0| 0| 0| 1|
|id14| 1| 0| 0| 0| 0| 1| 0| 1| 0| 0|
|id15| 1| 1| 0| 1| 0| 0| 0| 0| 0| 0|
|id16| 1| 0| 1| 1| 0| 1| 1| 0| 1| 0|
|id17| 1| 0| 1| 0| 0| 1| 0| 0| 1| 0|
|id18| 1| 0| 1| 1| 1| 1| 0| 0| 0| 0|
|id19| 0| 1| 1| 0| 0| 1| 0| 0| 1| 0|
+----+------+------+------+------+------+------+------+------+------+------+
第一步是展开电影栏,所以我们只有 2 栏,一栏用于 id
,一栏用于 movie
:
df_expl = df\
.select(
"id",
psf.explode(
psf.array(
[psf.when(df[c] == 1, psf.lit(c)).otherwise(None) for c in df.columns if c != "id"]
)).alias("movie"))\
.filter("movie IS NOT NULL")
对于每个 movie
,我们将使用 window
函数计算看过它的人数:
w = Window.partitionBy("movie")
df_count = df_expl.withColumn("count", psf.count("*").over(w))
我们现在将此数据框与其自身连接起来以获得电影对:
df1 = df_count.select([df_count[c].alias(c + "1") for c in df_count.columns])
df2 = df_count.select([df_count[c].alias(c + "2") for c in df_count.columns])
df12 = df1.join(df2, (df1.id1 == df2.id2) & (df1.movie1 < df2.movie2)).drop("id2")
最后,对于每一对 (movie1, movie2)
,我们将计算看过两者的人与看过其中之一的人的比率:
res = df12\
.groupBy("movie1", "movie2")\
.agg(
psf.count("id1").alias("count12"),
psf.max("count1").alias("count1"),
psf.max("count2").alias("count2"))\
.withColumn("pct", psf.col("count12")/(psf.col("count1") + psf.col("count2") - psf.col("count12")))
res.sort("movie1", "movie2").show()
+------+------+-------+------+------+-------------------+
|movie1|movie2|count12|count1|count2| pct|
+------+------+-------+------+------+-------------------+
|movie0|movie1| 20| 43| 48|0.28169014084507044|
|movie0|movie2| 24| 43| 50|0.34782608695652173|
|movie0|movie3| 27| 43| 52|0.39705882352941174|
|movie0|movie4| 17| 43| 47| 0.2328767123287671|
|movie0|movie5| 18| 43| 46| 0.2535211267605634|
|movie0|movie6| 20| 43| 44|0.29850746268656714|
|movie0|movie7| 13| 43| 35| 0.2|
|movie0|movie8| 20| 43| 44|0.29850746268656714|
|movie0|movie9| 15| 43| 47| 0.2|
|movie1|movie2| 26| 48| 50| 0.3611111111111111|
|movie1|movie3| 25| 48| 52| 0.3333333333333333|
|movie1|movie4| 27| 48| 47|0.39705882352941174|
|movie1|movie5| 23| 48| 46| 0.323943661971831|
|movie1|movie6| 25| 48| 44| 0.373134328358209|
|movie1|movie7| 17| 48| 35|0.25757575757575757|
|movie1|movie8| 24| 48| 44|0.35294117647058826|
|movie1|movie9| 21| 48| 47|0.28378378378378377|
|movie2|movie3| 25| 50| 52| 0.3246753246753247|
|movie2|movie4| 28| 50| 47| 0.4057971014492754|
|movie2|movie5| 27| 50| 46| 0.391304347826087|
+------+------+-------+------+------+-------------------+
我有一个矩阵 A,有 900 万行和 80K 列。矩阵本身非常稀疏。现在,我想得到比率
ratio = (A.T x A)/(n-((1-A).T x (1-A)))
假设用户是我的行,电影是列,我想计算看过两部电影(m1 或 m2)的人,他们看过两部电影(m1 和 m2)的百分比是多少。
我通过将 A 转换为 rdd 找到了第一部分 (A.T x A) 的解决方案。
def coordinateMatrixMultiply(leftmatrix, rightmatrix):
left = leftmatrix.map(lambda (i, j, v): (j, (i, v)))
right = rightmatrix.map(lambda (j, k, w): (j, (k, w)))
productEntries = left \
.join(right) \
.map(lambda (x, ((i, v), (k, w))): ((i, k), (v * w))) \
.reduceByKey(lambda x,y: x+y) \
.map(lambda ((i, k), sum): (i, k, sum))
return productEntries
但是,我发现分母 (1-A).T x (1-A) 由于大小而更难求解。由于内存的原因,尝试创建一个密集的矩阵是根本不可能的。我也尝试过使用 union all 将大量数据帧连接在一起,但速度很慢。有一个更好的方法吗?谢谢!
您不需要矩阵乘积来进行此类计算:
首先让我们创建一个 0 和 1 的示例数据框:
import numpy as np
import pyspark.sql.funtions as psf
from pyspark.sql import Window
df = spark.createDataFrame(
sc.parallelize([["id" + str(n)] + np.random.randint(0, 2, 10).tolist() for n in range(100)]),
["id"] + ["movie" + str(i) for i in range(10)])
df.show()
+----+------+------+------+------+------+------+------+------+------+------+
| id|movie0|movie1|movie2|movie3|movie4|movie5|movie6|movie7|movie8|movie9|
+----+------+------+------+------+------+------+------+------+------+------+
| id0| 1| 0| 1| 1| 1| 0| 0| 0| 0| 1|
| id1| 0| 1| 1| 0| 0| 0| 1| 0| 1| 0|
| id2| 1| 1| 1| 0| 0| 1| 1| 0| 0| 1|
| id3| 1| 0| 0| 1| 0| 1| 0| 0| 0| 1|
| id4| 0| 0| 0| 1| 1| 0| 0| 1| 0| 1|
| id5| 1| 1| 1| 0| 1| 0| 0| 0| 1| 0|
| id6| 0| 1| 1| 0| 1| 0| 0| 1| 0| 1|
| id7| 1| 1| 1| 1| 0| 1| 1| 0| 1| 0|
| id8| 0| 1| 0| 0| 1| 1| 1| 0| 1| 1|
| id9| 1| 0| 1| 0| 0| 1| 1| 0| 1| 0|
|id10| 1| 0| 1| 1| 0| 1| 0| 1| 0| 0|
|id11| 1| 0| 0| 0| 0| 0| 1| 1| 0| 0|
|id12| 0| 1| 0| 0| 1| 1| 1| 0| 1| 0|
|id13| 0| 0| 1| 1| 1| 1| 0| 0| 0| 1|
|id14| 1| 0| 0| 0| 0| 1| 0| 1| 0| 0|
|id15| 1| 1| 0| 1| 0| 0| 0| 0| 0| 0|
|id16| 1| 0| 1| 1| 0| 1| 1| 0| 1| 0|
|id17| 1| 0| 1| 0| 0| 1| 0| 0| 1| 0|
|id18| 1| 0| 1| 1| 1| 1| 0| 0| 0| 0|
|id19| 0| 1| 1| 0| 0| 1| 0| 0| 1| 0|
+----+------+------+------+------+------+------+------+------+------+------+
第一步是展开电影栏,所以我们只有 2 栏,一栏用于 id
,一栏用于 movie
:
df_expl = df\
.select(
"id",
psf.explode(
psf.array(
[psf.when(df[c] == 1, psf.lit(c)).otherwise(None) for c in df.columns if c != "id"]
)).alias("movie"))\
.filter("movie IS NOT NULL")
对于每个 movie
,我们将使用 window
函数计算看过它的人数:
w = Window.partitionBy("movie")
df_count = df_expl.withColumn("count", psf.count("*").over(w))
我们现在将此数据框与其自身连接起来以获得电影对:
df1 = df_count.select([df_count[c].alias(c + "1") for c in df_count.columns])
df2 = df_count.select([df_count[c].alias(c + "2") for c in df_count.columns])
df12 = df1.join(df2, (df1.id1 == df2.id2) & (df1.movie1 < df2.movie2)).drop("id2")
最后,对于每一对 (movie1, movie2)
,我们将计算看过两者的人与看过其中之一的人的比率:
res = df12\
.groupBy("movie1", "movie2")\
.agg(
psf.count("id1").alias("count12"),
psf.max("count1").alias("count1"),
psf.max("count2").alias("count2"))\
.withColumn("pct", psf.col("count12")/(psf.col("count1") + psf.col("count2") - psf.col("count12")))
res.sort("movie1", "movie2").show()
+------+------+-------+------+------+-------------------+
|movie1|movie2|count12|count1|count2| pct|
+------+------+-------+------+------+-------------------+
|movie0|movie1| 20| 43| 48|0.28169014084507044|
|movie0|movie2| 24| 43| 50|0.34782608695652173|
|movie0|movie3| 27| 43| 52|0.39705882352941174|
|movie0|movie4| 17| 43| 47| 0.2328767123287671|
|movie0|movie5| 18| 43| 46| 0.2535211267605634|
|movie0|movie6| 20| 43| 44|0.29850746268656714|
|movie0|movie7| 13| 43| 35| 0.2|
|movie0|movie8| 20| 43| 44|0.29850746268656714|
|movie0|movie9| 15| 43| 47| 0.2|
|movie1|movie2| 26| 48| 50| 0.3611111111111111|
|movie1|movie3| 25| 48| 52| 0.3333333333333333|
|movie1|movie4| 27| 48| 47|0.39705882352941174|
|movie1|movie5| 23| 48| 46| 0.323943661971831|
|movie1|movie6| 25| 48| 44| 0.373134328358209|
|movie1|movie7| 17| 48| 35|0.25757575757575757|
|movie1|movie8| 24| 48| 44|0.35294117647058826|
|movie1|movie9| 21| 48| 47|0.28378378378378377|
|movie2|movie3| 25| 50| 52| 0.3246753246753247|
|movie2|movie4| 28| 50| 47| 0.4057971014492754|
|movie2|movie5| 27| 50| 46| 0.391304347826087|
+------+------+-------+------+------+-------------------+