PySpark 中的聚合稀疏向量
Aggregate sparse vector in PySpark
我有一个 Hive table,其中包含文本数据和一些与每个文档关联的元数据。看起来像这样。
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer
df = sc.parallelize([
("1", "doc_1", "fruit is good for you"),
("2", "doc_2", "you should eat fruit and veggies"),
("2", "doc_3", "kids eat fruit but not veggies")
]).toDF(["month","doc_id", "text"])
+-----+------+--------------------+
|month|doc_id| text|
+-----+------+--------------------+
| 1| doc_1|fruit is good for...|
| 2| doc_2|you should eat fr...|
| 2| doc_3|kids eat fruit bu...|
+-----+------+--------------------+
我想按月统计字数。
到目前为止,我采用了 CountVectorizer 方法:
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
tokenized = tokenizer.transform(df)
cvModel = CountVectorizer().setInputCol("words").setOutputCol("features").fit(tokenized)
counted = cvModel.transform(tokenized)
+-----+------+--------------------+--------------------+--------------------+
|month|doc_id| text| words| features|
+-----+------+--------------------+--------------------+--------------------+
| 1| doc_1|fruit is good for...|[fruit, is, good,...|(12,[0,3,4,7,8],[...|
| 2| doc_2|you should eat fr...|[you, should, eat...|(12,[0,1,2,3,9,11...|
| 2| doc_3|kids eat fruit bu...|[kids, eat, fruit...|(12,[0,1,2,5,6,10...|
+-----+------+--------------------+--------------------+--------------------+
现在我想按月分组,return 看起来像:
month word count
1 fruit 1
1 is 1
...
2 fruit 2
2 kids 1
2 eat 2
...
我该怎么做?
Vector
* 聚合没有内置机制,但您在这里不需要。一旦你对数据进行了标记化,你就可以 explode
并聚合:
from pyspark.sql.functions import explode
(counted
.select("month", explode("words").alias("word"))
.groupBy("month", "word")
.count())
如果您希望将结果限制为 vocabulary
,只需添加一个过滤器:
from pyspark.sql.functions import col
(counted
.select("month", explode("words").alias("word"))
.where(col("word").isin(cvModel.vocabulary))
.groupBy("month", "word")
.count())
* 从 Spark 2.4 开始 但它在这里没有用。
我有一个 Hive table,其中包含文本数据和一些与每个文档关联的元数据。看起来像这样。
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer
df = sc.parallelize([
("1", "doc_1", "fruit is good for you"),
("2", "doc_2", "you should eat fruit and veggies"),
("2", "doc_3", "kids eat fruit but not veggies")
]).toDF(["month","doc_id", "text"])
+-----+------+--------------------+
|month|doc_id| text|
+-----+------+--------------------+
| 1| doc_1|fruit is good for...|
| 2| doc_2|you should eat fr...|
| 2| doc_3|kids eat fruit bu...|
+-----+------+--------------------+
我想按月统计字数。 到目前为止,我采用了 CountVectorizer 方法:
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
tokenized = tokenizer.transform(df)
cvModel = CountVectorizer().setInputCol("words").setOutputCol("features").fit(tokenized)
counted = cvModel.transform(tokenized)
+-----+------+--------------------+--------------------+--------------------+
|month|doc_id| text| words| features|
+-----+------+--------------------+--------------------+--------------------+
| 1| doc_1|fruit is good for...|[fruit, is, good,...|(12,[0,3,4,7,8],[...|
| 2| doc_2|you should eat fr...|[you, should, eat...|(12,[0,1,2,3,9,11...|
| 2| doc_3|kids eat fruit bu...|[kids, eat, fruit...|(12,[0,1,2,5,6,10...|
+-----+------+--------------------+--------------------+--------------------+
现在我想按月分组,return 看起来像:
month word count
1 fruit 1
1 is 1
...
2 fruit 2
2 kids 1
2 eat 2
...
我该怎么做?
Vector
* 聚合没有内置机制,但您在这里不需要。一旦你对数据进行了标记化,你就可以 explode
并聚合:
from pyspark.sql.functions import explode
(counted
.select("month", explode("words").alias("word"))
.groupBy("month", "word")
.count())
如果您希望将结果限制为 vocabulary
,只需添加一个过滤器:
from pyspark.sql.functions import col
(counted
.select("month", explode("words").alias("word"))
.where(col("word").isin(cvModel.vocabulary))
.groupBy("month", "word")
.count())
* 从 Spark 2.4 开始