如何运行 multiple k means clustering and use groupBy in pyspark

How to run multiple k means clustering and use groupBy in pyspark

我有一个这样的数据集:

|Seq_key|   |Class_id|  |value|
Seq_key 1   Class_id 1  value 1
Seq_key 1   Class_id 2  value 2
Seq_key 1   Class_id 3  value 3
Seq_key 1   Class_id 4  value 4
Seq_key 1   Class_id 5  value 5
Seq_key 1   Class_id 6  value 6
Seq_key 2   Class_id 1  value 1
Seq_key 2   Class_id 2  value 2
Seq_key 2   Class_id 3  value 3
Seq_key 2   Class_id 4  value 4
Seq_key 2   Class_id 5  value 5
Seq_key 2   Class_id 6  value 6
Seq_key 2   Class_id 7  value 7
Seq_key 3   Class_id 1  value 1
Seq_key 3   Class_id 2  value 2
Seq_key 3   Class_id 3  value 3
Seq_key 3   Class_id 4  value 4
Seq_key 3   Class_id 5  value 5
Seq_key 3   Class_id 6  value 6
Seq_key 3   Class_id 7  value 7
Seq_key 3   Class_id 8  value 8

每个 Seq_keyClass_idsvalues 是互斥的。 我为每个 Seq_key 应用 k-means 聚类,并找到最佳的簇数、质心等,使得每个 Seq_key 的输出如下:

|Seq_key|   |Class id|  |Cluster|  |Centroid|
Seq_key 1   Class_id 1     1         128
Seq_key 1   Class_id 2     2         56
Seq_key 1   Class_id 3     3         100
Seq_key 1   Class_id 4     1         128
Seq_key 1   Class_id 5     1         128
Seq_key 1   Class_id 6     4         72
Seq_key 2   Class_id 1     1         5.5
Seq_key 2   Class_id 2     1         5.5
Seq_key 2   Class_id 3     2         3.4
Seq_key 2   Class_id 4     3         1.7
Seq_key 2   Class_id 5     1         5.5
Seq_key 2   Class_id 6     2         3.4
Seq_key 2   Class_id 7     2         3.4
Seq_key 3   Class_id 1     4         500
Seq_key 3   Class_id 2     1         700
Seq_key 3   Class_id 3     3         274
Seq_key 3   Class_id 4     2         189
Seq_key 3   Class_id 5     2         189
Seq_key 3   Class_id 6     4         500
Seq_key 3   Class_id 7     1         700
Seq_key 3   Class_id 8     3         274

目前,我正在手动遍历每个 Seq_key 并应用 pyspark.ml.clustering 库中的 k-means 算法。但这显然是低效的,因为 seq_keys 的数量增加到数万。另外,我没有正确利用spark的分布式计算。

Seq_key是互斥的,所以不能和其他Seq_keys聚类 有没有办法通过 ml 库中的 groupBy 类方法实现我的输出? 即使只计算按 Seq_key 分组的质心就足够了。 这可能吗?

所以我实施了一个临时解决方案,从中获得灵感post

我收集了一个不同的 Seq_keys 列表,然后手动循环遍历每个列表并应用 pyspark kmeans 方法,如下所示:

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.feature import VectorAssembler    
fleets=list(sens.select('SEQ_KEY').distinct().toPandas()['SEQ_KEY'])
for seqid in fleets:
    df_tmp=sens.filter(col('SEQ_ID')==seqid)\
    .select('SEQ_KEY','CLASS_ID','value')
    for c in df_tmp.columns:
        if c in FEATURE_COLS:
            df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
    df_tmp=df_tmp.na.drop()
    vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
    vector_df = vecAssembler.transform(df_tmp)
    bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
    model = bkm.fit(vector_df)
    cluster=model.transform(vector_df).drop('features')
    fleet_clusters.append(cluster)

final_clustered_fleet=reduce(DataFrame.unionByName, fleet_clusters)

我暂时不考虑质心。获取集群信息即可。

这显然是肮脏且低效的。事实上,由于 kmeans 函数调用了 collect 方法,我的工作花了大约 8 个小时才达到 运行。我 90% 的工作节点处于空闲状态。 如果有一种更有效的方法来做到这一点,最好利用 spark 提供的多个工作节点,那就太好了。

您也许可以通过水平并行提高运行时间,即 运行 并行执行多个 Spark 作业,如下所示:

from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count

def run_kmeans(seqid, data=sens):

    df_tmp=data.filter(col('SEQ_ID')==seqid)\
        .select('SEQ_KEY','CLASS_ID','value')
    for c in df_tmp.columns:
        if c in FEATURE_COLS:
            df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
    df_tmp=df_tmp.na.drop()
    vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
    vector_df = vecAssembler.transform(df_tmp)
    bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
    model = bkm.fit(vector_df)
    cluster=model.transform(vector_df).drop('features')

    return cluster

pool = ThreadPool(cpu_count())
fleet_clusters = pool.map(run_kmeans, fleets)