如何在 Pyspark 中使用 kmeans 正确标记具有预测集群的原始观测值?
How label properly original observations with predicted clusters using kmeans in Pyspark?
我想了解 k-means 方法在 PySpark 中的工作原理。
为此,我做了这个小例子:
In [120]: entry = [ [1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5],[5,5,5],[5,5,5],[1,1,1],[5,5,5]]
In [121]: rdd_entry = sc.parallelize(entry)
In [122]: clusters = KMeans.train(rdd_entry, k=5, maxIterations=10, initializationMode="random")
In [123]: rdd_labels = clusters.predict(rdd_entry)
In [125]: rdd_labels.collect()
Out[125]: [3, 1, 0, 0, 2, 2, 2, 3, 2]
In [126]: entry
Out[126]:
[[1, 1, 1],
[2, 2, 2],
[3, 3, 3],
[4, 4, 4],
[5, 5, 5],
[5, 5, 5],
[5, 5, 5],
[1, 1, 1],
[5, 5, 5]]
乍一看似乎是rdd_labels returns每个观察所属的簇,尊重原始rdd的顺序。虽然在这个例子中很明显,在我将处理 800 万个观测值的情况下,我如何确定?
此外,我想知道如何加入 rdd_entry 和 rdd_labels,并遵守该顺序,以便 rdd_entry 的每个观察都正确地标记了它的集群。
我尝试执行 .join(),但它跳转错误
In [127]: rdd_total = rdd_entry.join(rdd_labels)
In [128]: rdd_total.collect()
TypeError: 'int' object has no attribute '__getitem__'
希望对您有所帮助! (此解决方案基于pyspark.ml
)
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
#sample data
df = sc.parallelize([[1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5],[5,5,5],[5,5,5],[1,1,1],[5,5,5]]).\
toDF(('col1','col2','col3'))
vecAssembler = VectorAssembler(inputCols=df.columns, outputCol="features")
vector_df = vecAssembler.transform(df)
#kmeans clustering
kmeans=KMeans(k=3, seed=1)
model=kmeans.fit(vector_df)
predictions=model.transform(vector_df)
predictions.show()
输出为:
+----+----+----+-------------+----------+
|col1|col2|col3| features|prediction|
+----+----+----+-------------+----------+
| 1| 1| 1|[1.0,1.0,1.0]| 0|
| 2| 2| 2|[2.0,2.0,2.0]| 0|
| 3| 3| 3|[3.0,3.0,3.0]| 2|
| 4| 4| 4|[4.0,4.0,4.0]| 1|
| 5| 5| 5|[5.0,5.0,5.0]| 1|
| 5| 5| 5|[5.0,5.0,5.0]| 1|
| 5| 5| 5|[5.0,5.0,5.0]| 1|
| 1| 1| 1|[1.0,1.0,1.0]| 0|
| 5| 5| 5|[5.0,5.0,5.0]| 1|
+----+----+----+-------------+----------+
虽然 pyspark.ml
有更好的方法,但我想到了使用 pyspark.mllib
编写代码来实现相同的结果(触发器是@Muhammad 的评论)。所以这是基于 pyspark.mllib
...
的解决方案
from pyspark.mllib.clustering import KMeans
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
#sample data
rdd = sc.parallelize([[1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5],[5,5,5],[5,5,5],[1,1,1],[5,5,5]])
#K-Means example
model = KMeans.train(rdd, k=3, seed=1)
labels = model.predict(rdd)
#add cluster label to the original data
df1 = rdd.toDF(('col1','col2','col3')) \
.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df2 = spark.createDataFrame(labels, IntegerType()).toDF(('label')) \
.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df = df1.join(df2, on=["row_index"]).drop("row_index")
df.show()
我想了解 k-means 方法在 PySpark 中的工作原理。 为此,我做了这个小例子:
In [120]: entry = [ [1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5],[5,5,5],[5,5,5],[1,1,1],[5,5,5]]
In [121]: rdd_entry = sc.parallelize(entry)
In [122]: clusters = KMeans.train(rdd_entry, k=5, maxIterations=10, initializationMode="random")
In [123]: rdd_labels = clusters.predict(rdd_entry)
In [125]: rdd_labels.collect()
Out[125]: [3, 1, 0, 0, 2, 2, 2, 3, 2]
In [126]: entry
Out[126]:
[[1, 1, 1],
[2, 2, 2],
[3, 3, 3],
[4, 4, 4],
[5, 5, 5],
[5, 5, 5],
[5, 5, 5],
[1, 1, 1],
[5, 5, 5]]
乍一看似乎是rdd_labels returns每个观察所属的簇,尊重原始rdd的顺序。虽然在这个例子中很明显,在我将处理 800 万个观测值的情况下,我如何确定?
此外,我想知道如何加入 rdd_entry 和 rdd_labels,并遵守该顺序,以便 rdd_entry 的每个观察都正确地标记了它的集群。 我尝试执行 .join(),但它跳转错误
In [127]: rdd_total = rdd_entry.join(rdd_labels)
In [128]: rdd_total.collect()
TypeError: 'int' object has no attribute '__getitem__'
希望对您有所帮助! (此解决方案基于pyspark.ml
)
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
#sample data
df = sc.parallelize([[1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5],[5,5,5],[5,5,5],[1,1,1],[5,5,5]]).\
toDF(('col1','col2','col3'))
vecAssembler = VectorAssembler(inputCols=df.columns, outputCol="features")
vector_df = vecAssembler.transform(df)
#kmeans clustering
kmeans=KMeans(k=3, seed=1)
model=kmeans.fit(vector_df)
predictions=model.transform(vector_df)
predictions.show()
输出为:
+----+----+----+-------------+----------+
|col1|col2|col3| features|prediction|
+----+----+----+-------------+----------+
| 1| 1| 1|[1.0,1.0,1.0]| 0|
| 2| 2| 2|[2.0,2.0,2.0]| 0|
| 3| 3| 3|[3.0,3.0,3.0]| 2|
| 4| 4| 4|[4.0,4.0,4.0]| 1|
| 5| 5| 5|[5.0,5.0,5.0]| 1|
| 5| 5| 5|[5.0,5.0,5.0]| 1|
| 5| 5| 5|[5.0,5.0,5.0]| 1|
| 1| 1| 1|[1.0,1.0,1.0]| 0|
| 5| 5| 5|[5.0,5.0,5.0]| 1|
+----+----+----+-------------+----------+
虽然 pyspark.ml
有更好的方法,但我想到了使用 pyspark.mllib
编写代码来实现相同的结果(触发器是@Muhammad 的评论)。所以这是基于 pyspark.mllib
...
from pyspark.mllib.clustering import KMeans
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
#sample data
rdd = sc.parallelize([[1,1,1],[2,2,2],[3,3,3],[4,4,4],[5,5,5],[5,5,5],[5,5,5],[1,1,1],[5,5,5]])
#K-Means example
model = KMeans.train(rdd, k=3, seed=1)
labels = model.predict(rdd)
#add cluster label to the original data
df1 = rdd.toDF(('col1','col2','col3')) \
.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df2 = spark.createDataFrame(labels, IntegerType()).toDF(('label')) \
.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df = df1.join(df2, on=["row_index"]).drop("row_index")
df.show()