pyspark 大矩阵运算

pyspark large matrix operations

我有一个矩阵 A,有 900 万行和 80K 列。矩阵本身非常稀疏。现在,我想得到比率

ratio = (A.T x A)/(n-((1-A).T x (1-A)))

假设用户是我的行,电影是列,我想计算看过两部电影(m1 或 m2)的人,他们看过两部电影(m1 和 m2)的百分比是多少。

我通过将 A 转换为 rdd 找到了第一部分 (A.T x A) 的解决方案。

 def coordinateMatrixMultiply(leftmatrix, rightmatrix):
        left = leftmatrix.map(lambda (i, j, v): (j, (i, v)))
        right = rightmatrix.map(lambda (j, k, w): (j, (k, w)))
        productEntries = left   \
                        .join(right)    \
                        .map(lambda (x, ((i, v), (k, w))): ((i, k), (v * w)))   \
                        .reduceByKey(lambda x,y: x+y)   \
                        .map(lambda ((i, k), sum): (i, k, sum))
        return productEntries

但是,我发现分母 (1-A).T x (1-A) 由于大小而更难求解。由于内存的原因,尝试创建一个密集的矩阵是根本不可能的。我也尝试过使用 union all 将大量数据帧连接在一起,但速度很慢。有一个更好的方法吗?谢谢!

您不需要矩阵乘积来进行此类计算:

首先让我们创建一个 0 和 1 的示例数据框:

import numpy as np
import pyspark.sql.funtions as psf
from pyspark.sql import Window

df = spark.createDataFrame(
    sc.parallelize([["id" + str(n)] + np.random.randint(0, 2, 10).tolist() for n in range(100)]), 
    ["id"] + ["movie" + str(i) for i in range(10)])
df.show()

    +----+------+------+------+------+------+------+------+------+------+------+
    |  id|movie0|movie1|movie2|movie3|movie4|movie5|movie6|movie7|movie8|movie9|
    +----+------+------+------+------+------+------+------+------+------+------+
    | id0|     1|     0|     1|     1|     1|     0|     0|     0|     0|     1|
    | id1|     0|     1|     1|     0|     0|     0|     1|     0|     1|     0|
    | id2|     1|     1|     1|     0|     0|     1|     1|     0|     0|     1|
    | id3|     1|     0|     0|     1|     0|     1|     0|     0|     0|     1|
    | id4|     0|     0|     0|     1|     1|     0|     0|     1|     0|     1|
    | id5|     1|     1|     1|     0|     1|     0|     0|     0|     1|     0|
    | id6|     0|     1|     1|     0|     1|     0|     0|     1|     0|     1|
    | id7|     1|     1|     1|     1|     0|     1|     1|     0|     1|     0|
    | id8|     0|     1|     0|     0|     1|     1|     1|     0|     1|     1|
    | id9|     1|     0|     1|     0|     0|     1|     1|     0|     1|     0|
    |id10|     1|     0|     1|     1|     0|     1|     0|     1|     0|     0|
    |id11|     1|     0|     0|     0|     0|     0|     1|     1|     0|     0|
    |id12|     0|     1|     0|     0|     1|     1|     1|     0|     1|     0|
    |id13|     0|     0|     1|     1|     1|     1|     0|     0|     0|     1|
    |id14|     1|     0|     0|     0|     0|     1|     0|     1|     0|     0|
    |id15|     1|     1|     0|     1|     0|     0|     0|     0|     0|     0|
    |id16|     1|     0|     1|     1|     0|     1|     1|     0|     1|     0|
    |id17|     1|     0|     1|     0|     0|     1|     0|     0|     1|     0|
    |id18|     1|     0|     1|     1|     1|     1|     0|     0|     0|     0|
    |id19|     0|     1|     1|     0|     0|     1|     0|     0|     1|     0|
    +----+------+------+------+------+------+------+------+------+------+------+

第一步是展开电影栏,所以我们只有 2 栏,一栏用于 id,一栏用于 movie:

df_expl = df\
    .select(
        "id", 
        psf.explode(
            psf.array(
                [psf.when(df[c] == 1, psf.lit(c)).otherwise(None) for c in df.columns if c != "id"]
            )).alias("movie"))\
    .filter("movie IS NOT NULL")

对于每个 movie,我们将使用 window 函数计算看过它的人数:

w = Window.partitionBy("movie")
df_count = df_expl.withColumn("count", psf.count("*").over(w))

我们现在将此数据框与其自身连接起来以获得电影对:

df1 = df_count.select([df_count[c].alias(c + "1") for c in df_count.columns])
df2 = df_count.select([df_count[c].alias(c + "2") for c in df_count.columns])

df12 = df1.join(df2, (df1.id1 == df2.id2) & (df1.movie1 < df2.movie2)).drop("id2")

最后,对于每一对 (movie1, movie2),我们将计算看过两者的人与看过其中之一的人的比率:

res = df12\
    .groupBy("movie1", "movie2")\
    .agg(
        psf.count("id1").alias("count12"), 
        psf.max("count1").alias("count1"), 
        psf.max("count2").alias("count2"))\
    .withColumn("pct", psf.col("count12")/(psf.col("count1") + psf.col("count2") - psf.col("count12")))
res.sort("movie1", "movie2").show()

    +------+------+-------+------+------+-------------------+
    |movie1|movie2|count12|count1|count2|                pct|
    +------+------+-------+------+------+-------------------+
    |movie0|movie1|     20|    43|    48|0.28169014084507044|
    |movie0|movie2|     24|    43|    50|0.34782608695652173|
    |movie0|movie3|     27|    43|    52|0.39705882352941174|
    |movie0|movie4|     17|    43|    47| 0.2328767123287671|
    |movie0|movie5|     18|    43|    46| 0.2535211267605634|
    |movie0|movie6|     20|    43|    44|0.29850746268656714|
    |movie0|movie7|     13|    43|    35|                0.2|
    |movie0|movie8|     20|    43|    44|0.29850746268656714|
    |movie0|movie9|     15|    43|    47|                0.2|
    |movie1|movie2|     26|    48|    50| 0.3611111111111111|
    |movie1|movie3|     25|    48|    52| 0.3333333333333333|
    |movie1|movie4|     27|    48|    47|0.39705882352941174|
    |movie1|movie5|     23|    48|    46|  0.323943661971831|
    |movie1|movie6|     25|    48|    44|  0.373134328358209|
    |movie1|movie7|     17|    48|    35|0.25757575757575757|
    |movie1|movie8|     24|    48|    44|0.35294117647058826|
    |movie1|movie9|     21|    48|    47|0.28378378378378377|
    |movie2|movie3|     25|    50|    52| 0.3246753246753247|
    |movie2|movie4|     28|    50|    47| 0.4057971014492754|
    |movie2|movie5|     27|    50|    46|  0.391304347826087|
    +------+------+-------+------+------+-------------------+