在 pyspark 中使用基于 DataFrame API 的 2 个稀疏向量列表之间进行矩阵乘法的最佳方法是什么?
What's the best way to do matrix multiplication between 2 lists of sparseVectors with DataFrame-based API in pyspark?
我有 2 个 DataFrame
具有相同的结构:DataFrame[id: bigint, tfidf_features: vector]
我需要 dataframe1
中的多行与 dataframe2
中的行。我可以使用循环并执行以下操作:
dataframe1.collect()[i]['tfidf_features'].dot(dataframe2.collect()[j]['tfidf_features'])
.
但是,我想使用矩阵乘法,相当于:np.matmul(dataframe1_tfidf_features, dataframe2_tfidf_features.T)
。
你有两个选择
1. mllib.linalg.distributed.BlockMatrix
将两个数据帧转换为块矩阵并使用 mulitply
bm1 = IndexedRowMatrix(df1.rdd.map(lambda x: IndexedRow(x[0], x[1]))).toBlockMatrix()
bm2 = IndexedRowMatrix(df2.rdd.map(lambda x: IndexedRow(x[0], x[1]))).toBlockMatrix()
bm_result = bm1.multiply(bm2)
2。 pyspark.sql.dataframe.crossJoin
交叉连接两个数据帧并计算结果矩阵的单个元素,然后使用 collect_list 和排序
arr = np.array
df =df1.crossJoin(df2.select(col("id").alias("id2"),
col("features").alias("features2"))
udf_mult = udf(lambda x,y = float(arr(x).dot(arr(y).T).sum()), DoubleType())
df = df.withColumn("val", udf_mult("features","features2")).
drop("features","features2")
st = struct(["id2","val"]).alias("map")
df = df.select("id", st).groupBy("id").agg(collect_list("map").alias("list"))
def sort(x):
x = sorted(x, key=lambda x:x[0])
y = list(map(lambda a:a[1], x))
return(y)
udf_sort = udf(sort, ArrayType(DoubleType()))
df = df.withColumn("list", udf_sort("list"))
我有 2 个 DataFrame
具有相同的结构:DataFrame[id: bigint, tfidf_features: vector]
我需要 dataframe1
中的多行与 dataframe2
中的行。我可以使用循环并执行以下操作:
dataframe1.collect()[i]['tfidf_features'].dot(dataframe2.collect()[j]['tfidf_features'])
.
但是,我想使用矩阵乘法,相当于:np.matmul(dataframe1_tfidf_features, dataframe2_tfidf_features.T)
。
你有两个选择
1. mllib.linalg.distributed.BlockMatrix
将两个数据帧转换为块矩阵并使用 mulitply
bm1 = IndexedRowMatrix(df1.rdd.map(lambda x: IndexedRow(x[0], x[1]))).toBlockMatrix()
bm2 = IndexedRowMatrix(df2.rdd.map(lambda x: IndexedRow(x[0], x[1]))).toBlockMatrix()
bm_result = bm1.multiply(bm2)
2。 pyspark.sql.dataframe.crossJoin
交叉连接两个数据帧并计算结果矩阵的单个元素,然后使用 collect_list 和排序
arr = np.array
df =df1.crossJoin(df2.select(col("id").alias("id2"),
col("features").alias("features2"))
udf_mult = udf(lambda x,y = float(arr(x).dot(arr(y).T).sum()), DoubleType())
df = df.withColumn("val", udf_mult("features","features2")).
drop("features","features2")
st = struct(["id2","val"]).alias("map")
df = df.select("id", st).groupBy("id").agg(collect_list("map").alias("list"))
def sort(x):
x = sorted(x, key=lambda x:x[0])
y = list(map(lambda a:a[1], x))
return(y)
udf_sort = udf(sort, ArrayType(DoubleType()))
df = df.withColumn("list", udf_sort("list"))