如何运行 multiple k means clustering and use groupBy in pyspark
How to run multiple k means clustering and use groupBy in pyspark
我有一个这样的数据集:
|Seq_key| |Class_id| |value|
Seq_key 1 Class_id 1 value 1
Seq_key 1 Class_id 2 value 2
Seq_key 1 Class_id 3 value 3
Seq_key 1 Class_id 4 value 4
Seq_key 1 Class_id 5 value 5
Seq_key 1 Class_id 6 value 6
Seq_key 2 Class_id 1 value 1
Seq_key 2 Class_id 2 value 2
Seq_key 2 Class_id 3 value 3
Seq_key 2 Class_id 4 value 4
Seq_key 2 Class_id 5 value 5
Seq_key 2 Class_id 6 value 6
Seq_key 2 Class_id 7 value 7
Seq_key 3 Class_id 1 value 1
Seq_key 3 Class_id 2 value 2
Seq_key 3 Class_id 3 value 3
Seq_key 3 Class_id 4 value 4
Seq_key 3 Class_id 5 value 5
Seq_key 3 Class_id 6 value 6
Seq_key 3 Class_id 7 value 7
Seq_key 3 Class_id 8 value 8
每个 Seq_key
的 Class_ids
和 values
是互斥的。
我为每个 Seq_key
应用 k-means 聚类,并找到最佳的簇数、质心等,使得每个 Seq_key
的输出如下:
|Seq_key| |Class id| |Cluster| |Centroid|
Seq_key 1 Class_id 1 1 128
Seq_key 1 Class_id 2 2 56
Seq_key 1 Class_id 3 3 100
Seq_key 1 Class_id 4 1 128
Seq_key 1 Class_id 5 1 128
Seq_key 1 Class_id 6 4 72
Seq_key 2 Class_id 1 1 5.5
Seq_key 2 Class_id 2 1 5.5
Seq_key 2 Class_id 3 2 3.4
Seq_key 2 Class_id 4 3 1.7
Seq_key 2 Class_id 5 1 5.5
Seq_key 2 Class_id 6 2 3.4
Seq_key 2 Class_id 7 2 3.4
Seq_key 3 Class_id 1 4 500
Seq_key 3 Class_id 2 1 700
Seq_key 3 Class_id 3 3 274
Seq_key 3 Class_id 4 2 189
Seq_key 3 Class_id 5 2 189
Seq_key 3 Class_id 6 4 500
Seq_key 3 Class_id 7 1 700
Seq_key 3 Class_id 8 3 274
目前,我正在手动遍历每个 Seq_key
并应用 pyspark.ml.clustering
库中的 k-means 算法。但这显然是低效的,因为 seq_keys
的数量增加到数万。另外,我没有正确利用spark的分布式计算。
Seq_key
是互斥的,所以不能和其他Seq_keys
聚类
有没有办法通过 ml
库中的 groupBy
类方法实现我的输出?
即使只计算按 Seq_key
分组的质心就足够了。
这可能吗?
所以我实施了一个临时解决方案,从中获得灵感post。
我收集了一个不同的 Seq_keys
列表,然后手动循环遍历每个列表并应用 pyspark kmeans 方法,如下所示:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.feature import VectorAssembler
fleets=list(sens.select('SEQ_KEY').distinct().toPandas()['SEQ_KEY'])
for seqid in fleets:
df_tmp=sens.filter(col('SEQ_ID')==seqid)\
.select('SEQ_KEY','CLASS_ID','value')
for c in df_tmp.columns:
if c in FEATURE_COLS:
df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
df_tmp=df_tmp.na.drop()
vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
vector_df = vecAssembler.transform(df_tmp)
bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
model = bkm.fit(vector_df)
cluster=model.transform(vector_df).drop('features')
fleet_clusters.append(cluster)
final_clustered_fleet=reduce(DataFrame.unionByName, fleet_clusters)
我暂时不考虑质心。获取集群信息即可。
这显然是肮脏且低效的。事实上,由于 kmeans 函数调用了 collect
方法,我的工作花了大约 8 个小时才达到 运行。我 90% 的工作节点处于空闲状态。
如果有一种更有效的方法来做到这一点,最好利用 spark 提供的多个工作节点,那就太好了。
您也许可以通过水平并行提高运行时间,即 运行 并行执行多个 Spark 作业,如下所示:
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
def run_kmeans(seqid, data=sens):
df_tmp=data.filter(col('SEQ_ID')==seqid)\
.select('SEQ_KEY','CLASS_ID','value')
for c in df_tmp.columns:
if c in FEATURE_COLS:
df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
df_tmp=df_tmp.na.drop()
vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
vector_df = vecAssembler.transform(df_tmp)
bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
model = bkm.fit(vector_df)
cluster=model.transform(vector_df).drop('features')
return cluster
pool = ThreadPool(cpu_count())
fleet_clusters = pool.map(run_kmeans, fleets)
我有一个这样的数据集:
|Seq_key| |Class_id| |value|
Seq_key 1 Class_id 1 value 1
Seq_key 1 Class_id 2 value 2
Seq_key 1 Class_id 3 value 3
Seq_key 1 Class_id 4 value 4
Seq_key 1 Class_id 5 value 5
Seq_key 1 Class_id 6 value 6
Seq_key 2 Class_id 1 value 1
Seq_key 2 Class_id 2 value 2
Seq_key 2 Class_id 3 value 3
Seq_key 2 Class_id 4 value 4
Seq_key 2 Class_id 5 value 5
Seq_key 2 Class_id 6 value 6
Seq_key 2 Class_id 7 value 7
Seq_key 3 Class_id 1 value 1
Seq_key 3 Class_id 2 value 2
Seq_key 3 Class_id 3 value 3
Seq_key 3 Class_id 4 value 4
Seq_key 3 Class_id 5 value 5
Seq_key 3 Class_id 6 value 6
Seq_key 3 Class_id 7 value 7
Seq_key 3 Class_id 8 value 8
每个 Seq_key
的 Class_ids
和 values
是互斥的。
我为每个 Seq_key
应用 k-means 聚类,并找到最佳的簇数、质心等,使得每个 Seq_key
的输出如下:
|Seq_key| |Class id| |Cluster| |Centroid|
Seq_key 1 Class_id 1 1 128
Seq_key 1 Class_id 2 2 56
Seq_key 1 Class_id 3 3 100
Seq_key 1 Class_id 4 1 128
Seq_key 1 Class_id 5 1 128
Seq_key 1 Class_id 6 4 72
Seq_key 2 Class_id 1 1 5.5
Seq_key 2 Class_id 2 1 5.5
Seq_key 2 Class_id 3 2 3.4
Seq_key 2 Class_id 4 3 1.7
Seq_key 2 Class_id 5 1 5.5
Seq_key 2 Class_id 6 2 3.4
Seq_key 2 Class_id 7 2 3.4
Seq_key 3 Class_id 1 4 500
Seq_key 3 Class_id 2 1 700
Seq_key 3 Class_id 3 3 274
Seq_key 3 Class_id 4 2 189
Seq_key 3 Class_id 5 2 189
Seq_key 3 Class_id 6 4 500
Seq_key 3 Class_id 7 1 700
Seq_key 3 Class_id 8 3 274
目前,我正在手动遍历每个 Seq_key
并应用 pyspark.ml.clustering
库中的 k-means 算法。但这显然是低效的,因为 seq_keys
的数量增加到数万。另外,我没有正确利用spark的分布式计算。
Seq_key
是互斥的,所以不能和其他Seq_keys
聚类
有没有办法通过 ml
库中的 groupBy
类方法实现我的输出?
即使只计算按 Seq_key
分组的质心就足够了。
这可能吗?
所以我实施了一个临时解决方案,从中获得灵感post。
我收集了一个不同的 Seq_keys
列表,然后手动循环遍历每个列表并应用 pyspark kmeans 方法,如下所示:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.feature import VectorAssembler
fleets=list(sens.select('SEQ_KEY').distinct().toPandas()['SEQ_KEY'])
for seqid in fleets:
df_tmp=sens.filter(col('SEQ_ID')==seqid)\
.select('SEQ_KEY','CLASS_ID','value')
for c in df_tmp.columns:
if c in FEATURE_COLS:
df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
df_tmp=df_tmp.na.drop()
vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
vector_df = vecAssembler.transform(df_tmp)
bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
model = bkm.fit(vector_df)
cluster=model.transform(vector_df).drop('features')
fleet_clusters.append(cluster)
final_clustered_fleet=reduce(DataFrame.unionByName, fleet_clusters)
我暂时不考虑质心。获取集群信息即可。
这显然是肮脏且低效的。事实上,由于 kmeans 函数调用了 collect
方法,我的工作花了大约 8 个小时才达到 运行。我 90% 的工作节点处于空闲状态。
如果有一种更有效的方法来做到这一点,最好利用 spark 提供的多个工作节点,那就太好了。
您也许可以通过水平并行提高运行时间,即 运行 并行执行多个 Spark 作业,如下所示:
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
def run_kmeans(seqid, data=sens):
df_tmp=data.filter(col('SEQ_ID')==seqid)\
.select('SEQ_KEY','CLASS_ID','value')
for c in df_tmp.columns:
if c in FEATURE_COLS:
df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
df_tmp=df_tmp.na.drop()
vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
vector_df = vecAssembler.transform(df_tmp)
bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
model = bkm.fit(vector_df)
cluster=model.transform(vector_df).drop('features')
return cluster
pool = ThreadPool(cpu_count())
fleet_clusters = pool.map(run_kmeans, fleets)