从 pyspark 中的数据框生成稀疏向量

Generate Sparse Vector from dataframe in pyspark

我有一个 DF 用户列表:

+-------+---+
|   user| Id|
+-------+---+
|  Peter|  1|
|    Max|  2|
|  Steve|  3|
|  Chris|  4|

并有另一个 DF,其中包含哪个页面访问了哪个用户的信息:

+-----------+-------+
|       page|   user|
+-----------+-------+
|      Page1|  Peter|
|      Page1|    Max|
|      Page2|    Max|
|      Page3|  Steve|
|      Page3|  Chris|
|      Page4|  Chris|

我通过 user 字段进行了 "inner" 加入:

+-----------+-------+---+
|       page|   user| Id|
+-----------+-------+---+
|      Page1|  Peter|  1| 
|      Page1|    Max|  2| 
|      Page2|    Max|  2| 
|      Page3|  Steve|  3| 
|      Page3|  Chris|  4| 
|      Page4|  Chris|  4| 

现在要创建此 DF 的稀疏向量,如下所示:

+-------+----------------------------------------------------+
|   page|                                            features|
+-------+----------------------------------------------------+
|  Page1|SparseVector({Peter: 1.0, Max: 1.0, Steve: 0.0,...})|
|  Page2|SparseVector({Peter: 0.0, Max: 1.0, Steve: 0.0,...})|
|  Page3|SparseVector({Peter: 0.0, Max: 0.0, Steve: 1.0,...})|
|  Page4|SparseVector({Peter: 0.0, Max: 0.0, Steve: 0.0,...})|

如果该页面有用户,则设置为“1.0”;如果该页面没有用户,则设置为“0.0”

from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")

df = sc.parallelize([Row(PAGE="Page1", USER="Peter", USER_ID=1),
                     Row(PAGE="Page1", USER="Max", USER_ID=2),
                     Row(PAGE="Page2", USER="Max", USER_ID=2),
                     Row(PAGE="Page3", USER="Steve", USER_ID=3),
                     Row(PAGE="Page3", USER="Chris", USER_ID=4),
                     Row(PAGE="Page4", USER="Chris", USER_ID=4)]).toDF()


dfpivot=(df.groupBy("PAGE").pivot("USER").count().na.fill(0))
dfpivot.show()
input_cols = [x for x in dfpivot.columns if x != "PAGE"]


dfassembler1 = (VectorAssembler(inputCols=input_cols, outputCol="features").transform(dfpivot)
                .select("PAGE", "features"))
dfassembler1.show()

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=3)
model = mh.fit(dfassembler1)

model.transform(dfassembler1).show(3, False)

但我遇到了一些异常:无法解析列名...并且在调用 "transform"

时出错

可能是我做错了什么。如果有任何帮助,我将不胜感激

答案在错误消息中。只需在配置中将 spark.sql.pivotMaxValues 设置为您期望的不同值的数量。

set('spark.sql.pivotMaxValues', '999999')

这是对我有用的代码

spark.conf.set("spark.sql.pivotMaxValues", 9000000)