从 pyspark 中的数据框生成稀疏向量
Generate Sparse Vector from dataframe in pyspark
我有一个 DF 用户列表:
+-------+---+
| user| Id|
+-------+---+
| Peter| 1|
| Max| 2|
| Steve| 3|
| Chris| 4|
并有另一个 DF,其中包含哪个页面访问了哪个用户的信息:
+-----------+-------+
| page| user|
+-----------+-------+
| Page1| Peter|
| Page1| Max|
| Page2| Max|
| Page3| Steve|
| Page3| Chris|
| Page4| Chris|
我通过 user 字段进行了 "inner" 加入:
+-----------+-------+---+
| page| user| Id|
+-----------+-------+---+
| Page1| Peter| 1|
| Page1| Max| 2|
| Page2| Max| 2|
| Page3| Steve| 3|
| Page3| Chris| 4|
| Page4| Chris| 4|
现在要创建此 DF 的稀疏向量,如下所示:
+-------+----------------------------------------------------+
| page| features|
+-------+----------------------------------------------------+
| Page1|SparseVector({Peter: 1.0, Max: 1.0, Steve: 0.0,...})|
| Page2|SparseVector({Peter: 0.0, Max: 1.0, Steve: 0.0,...})|
| Page3|SparseVector({Peter: 0.0, Max: 0.0, Steve: 1.0,...})|
| Page4|SparseVector({Peter: 0.0, Max: 0.0, Steve: 0.0,...})|
如果该页面有用户,则设置为“1.0”;如果该页面没有用户,则设置为“0.0”
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
df = sc.parallelize([Row(PAGE="Page1", USER="Peter", USER_ID=1),
Row(PAGE="Page1", USER="Max", USER_ID=2),
Row(PAGE="Page2", USER="Max", USER_ID=2),
Row(PAGE="Page3", USER="Steve", USER_ID=3),
Row(PAGE="Page3", USER="Chris", USER_ID=4),
Row(PAGE="Page4", USER="Chris", USER_ID=4)]).toDF()
dfpivot=(df.groupBy("PAGE").pivot("USER").count().na.fill(0))
dfpivot.show()
input_cols = [x for x in dfpivot.columns if x != "PAGE"]
dfassembler1 = (VectorAssembler(inputCols=input_cols, outputCol="features").transform(dfpivot)
.select("PAGE", "features"))
dfassembler1.show()
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=3)
model = mh.fit(dfassembler1)
model.transform(dfassembler1).show(3, False)
但我遇到了一些异常:无法解析列名...并且在调用 "transform"
时出错
可能是我做错了什么。如果有任何帮助,我将不胜感激
答案在错误消息中。只需在配置中将 spark.sql.pivotMaxValues
设置为您期望的不同值的数量。
set('spark.sql.pivotMaxValues', '999999')
这是对我有用的代码
spark.conf.set("spark.sql.pivotMaxValues", 9000000)
我有一个 DF 用户列表:
+-------+---+
| user| Id|
+-------+---+
| Peter| 1|
| Max| 2|
| Steve| 3|
| Chris| 4|
并有另一个 DF,其中包含哪个页面访问了哪个用户的信息:
+-----------+-------+
| page| user|
+-----------+-------+
| Page1| Peter|
| Page1| Max|
| Page2| Max|
| Page3| Steve|
| Page3| Chris|
| Page4| Chris|
我通过 user 字段进行了 "inner" 加入:
+-----------+-------+---+
| page| user| Id|
+-----------+-------+---+
| Page1| Peter| 1|
| Page1| Max| 2|
| Page2| Max| 2|
| Page3| Steve| 3|
| Page3| Chris| 4|
| Page4| Chris| 4|
现在要创建此 DF 的稀疏向量,如下所示:
+-------+----------------------------------------------------+
| page| features|
+-------+----------------------------------------------------+
| Page1|SparseVector({Peter: 1.0, Max: 1.0, Steve: 0.0,...})|
| Page2|SparseVector({Peter: 0.0, Max: 1.0, Steve: 0.0,...})|
| Page3|SparseVector({Peter: 0.0, Max: 0.0, Steve: 1.0,...})|
| Page4|SparseVector({Peter: 0.0, Max: 0.0, Steve: 0.0,...})|
如果该页面有用户,则设置为“1.0”;如果该页面没有用户,则设置为“0.0”
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
df = sc.parallelize([Row(PAGE="Page1", USER="Peter", USER_ID=1),
Row(PAGE="Page1", USER="Max", USER_ID=2),
Row(PAGE="Page2", USER="Max", USER_ID=2),
Row(PAGE="Page3", USER="Steve", USER_ID=3),
Row(PAGE="Page3", USER="Chris", USER_ID=4),
Row(PAGE="Page4", USER="Chris", USER_ID=4)]).toDF()
dfpivot=(df.groupBy("PAGE").pivot("USER").count().na.fill(0))
dfpivot.show()
input_cols = [x for x in dfpivot.columns if x != "PAGE"]
dfassembler1 = (VectorAssembler(inputCols=input_cols, outputCol="features").transform(dfpivot)
.select("PAGE", "features"))
dfassembler1.show()
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=3)
model = mh.fit(dfassembler1)
model.transform(dfassembler1).show(3, False)
但我遇到了一些异常:无法解析列名...并且在调用 "transform"
时出错可能是我做错了什么。如果有任何帮助,我将不胜感激
答案在错误消息中。只需在配置中将 spark.sql.pivotMaxValues
设置为您期望的不同值的数量。
set('spark.sql.pivotMaxValues', '999999')
这是对我有用的代码
spark.conf.set("spark.sql.pivotMaxValues", 9000000)