如何在 PySpark 中以数据帧的形式从所有预测的集群中获取 n samples/events 和 return?

How can take n samples/events from all predicted clusters and return in form of dataframe in PySpark?

我正在关注这个 tutorial 并尝试 pick/select 顶级 n 事件,比方说 10 events/rows 在将预测的集群分配给主要 df 并将它们合并并以spark数据框的形式报告。 假设我们的主数据框 df1 包含以下 3 个特征:

+-----+------------+----------+----------+
|   id|           x|         y|         z|
+-----+------------+----------+----------+
| row0|  -6.0776997|-2.9096103|-1.5181729|
| row1|  -1.0122601|  7.322841|-5.4424076|
| row2|   -8.297007| 6.3228936| 1.1672047|
| row3|  -3.5071216|  4.784812|-5.4449472|
| row4|   -5.122823|-3.3220499|-0.5069805|
| row5|  -2.4764006|  8.255791|  4.409478|
| row6|   7.3153954| -5.079449| -7.291215|
| row7|  -2.0167463|  9.303454|  7.095179|
| row8|  -0.2338185| -4.892681| 2.1228876|
| row9|    6.565442| -6.855994|-6.7983212|
|row10|  -5.6902847|-6.4827404|-0.9246967|
|row11|-0.017986143| 2.7632365| -8.814824|
|row12|  -6.9042625|-6.1491723|-3.5354295|
|row13|  -10.389865|  9.537853|  0.674591|
|row14|   3.9688683|-6.0467844| -5.462389|
|row15|   -7.337052|-3.7689247| -5.261122|
|row16|   -8.991589|  8.738728|  3.864116|
|row17| -0.18098584|  5.482743| -4.900118|
|row18|   3.3193955|-6.3573766| -6.978025|
|row19|  -2.0266335|-3.4171724|0.48218703|
+-----+------------+----------+----------+

现在我以数据帧 df2 的形式从聚类算法中获得信息,如下所示:

print("==========================Short report==================================== ")

n_clusters = model.summary.k
#n_clusters
print("Number of predicted clusters: " + str(n_clusters))

cluster_Sizes = model.summary.clusterSizes
#cluster_Sizes 

col = ['size']
df2 = pd.DataFrame(cluster_Sizes, columns=col).sort_values(by=['size'], ascending=True)  #sorting
cluster_Sizes = df2["size"].unique()
print("Size of predicted clusters: " + str(cluster_Sizes))
clusterSizes

#==========================Short report==================================== 
#Number of predicted clusters: 10
#Size of predicted clusters: [ 486  496  504  529  985  998  999 1003 2000]

+-----+----------+
|     |prediction|
+-----+----------+
|    2|       486|
|    6|       496|
|    0|       504|
|    8|       529|
|    5|       985|
|    9|       998|
|    7|       999|
|    3|      1003|
|    1|      2000|
|    4|      2000|
+-----+----------+

所以在这里,索引列是预测的簇标签。我可以将预测的集群标签分配到主数据框中,但不能分配集群大小,如下所示:

+-----+----------+------------+----------+----------+
|   id|prediction|           x|         y|         z|
+-----+----------+------------+----------+----------+
| row0|         9|  -6.0776997|-2.9096103|-1.5181729|
| row1|         4|  -1.0122601|  7.322841|-5.4424076|
| row2|         1|   -8.297007| 6.3228936| 1.1672047|
| row3|         4|  -3.5071216|  4.784812|-5.4449472|
| row4|         3|   -5.122823|-3.3220499|-0.5069805|
| row5|         1|  -2.4764006|  8.255791|  4.409478|
| row6|         5|   7.3153954| -5.079449| -7.291215|
| row7|         1|  -2.0167463|  9.303454|  7.095179|
| row8|         7|  -0.2338185| -4.892681| 2.1228876|
| row9|         5|    6.565442| -6.855994|-6.7983212|
|row10|         3|  -5.6902847|-6.4827404|-0.9246967|
|row11|         4|-0.017986143| 2.7632365| -8.814824|
|row12|         9|  -6.9042625|-6.1491723|-3.5354295|
|row13|         1|  -10.389865|  9.537853|  0.674591|
|row14|         2|   3.9688683|-6.0467844| -5.462389|
|row15|         9|   -7.337052|-3.7689247| -5.261122|
|row16|         1|   -8.991589|  8.738728|  3.864116|
|row17|         4| -0.18098584|  5.482743| -4.900118|
|row18|         2|   3.3193955|-6.3573766| -6.978025|
|row19|         7|  -2.0266335|-3.4171724|0.48218703|
+-----+----------+------------+----------+----------+

现在,想要通过以下函数 include\report 以数据帧的形式获取每个集群的前 n 行。到目前为止我尝试过的是使用(多)条件过滤:

print("==========================Short report==================================== ")

n_clusters = model.summary.k
#n_clusters
print("Number of predicted clusters: " + str(n_clusters))

cluster_Sizes = model.summary.clusterSizes
#cluster_Sizes 

col = ['size']
clusterSizes = pd.DataFrame(cluster_Sizes, columns=col).sort_values(by=['size'], ascending=True)  #sorting
cluster_Sizes = clusterSizes["size"].unique()
print("Size of predicted clusters: " + str(cluster_Sizes))
clusterSizes

from pyspark.sql.functions import max, min

def cls_report(df):
  x1=df.select([min("x")])        # will return max value of each column
  x2=df.select([max("y")])
  x3=df.select([max("z")])
  return x1,x2,x3

#pick top out clusters with minimum instances
km_1st_cls = clusterSizes.values[0]
km_2nd_cls = clusterSizes.values[1]
km_3rd_cls = clusterSizes.values[2]
print(km_1st_cls)
print(km_2nd_cls)
print(km_3rd_cls)


#F1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[0]
F1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[0].select("min(x)").rdd.flatMap(list).collect()[0]
F2 = cls_report(pddf_pred.filter(f"prediction == {km_2nd_cls}"))[0].select("min(x)").rdd.flatMap(list).collect()[0]
F3 = cls_report(pddf_pred.filter(f"prediction == {km_3rd_cls}"))[0].select("min(x)").rdd.flatMap(list).collect()[0]

L1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[1].select("max(y)").rdd.flatMap(list).collect()[0]
L2 = cls_report(pddf_pred.filter(f"prediction == {km_2nd_cls}"))[1].select("max(y)").rdd.flatMap(list).collect()[0]
L3 = cls_report(pddf_pred.filter(f"prediction == {km_3rd_cls}"))[1].select("max(y)").rdd.flatMap(list).collect()[0]

T1 = cls_report(pddf_pred.filter(f"prediction == {km_1st_cls}"))[2].select("max(z)").rdd.flatMap(list).collect()[0]
T2 = cls_report(pddf_pred.filter(f"prediction == {km_2nd_cls}"))[2].select("max(z)").rdd.flatMap(list).collect()[0]
T3 = cls_report(pddf_pred.filter(f"prediction == {km_3rd_cls}"))[2].select("max(z)").rdd.flatMap(list).collect()[0]

print(F1)
print(F2)
print(F3)

print(L1)
print(L2)
print(L3)

print(T1)
print(T2)
print(T3)


df_anomaly_1st_cls = pddf_pred.filter(f"(prediction == {km_1st_cls})") \
                                .filter(f"y == {L1}") \
                                .filter(f"z == {T1}") \
                                .filter(f"x == {F1}") 

display(df_anomaly_1st_cls)

我知道在 SciKit-learn 的 KM 算法中我们可以使用基于这个 post:

clusters=KMeans(n_clusters=5)
df[clusters.labels_==0] 

但我们无法在 spark 中访问此类 labels_ 以快速破解此任务。 是否有任何优雅的方式(也许定义函数)来调用它,以便我们可以在主数据帧上反映任何聚类算法的结果以便更好地推理?

注意: 我对使用 .toPandas()

将 spark 帧转换为 Pandas datafarme 来破解它不感兴趣

更新:我可能需要一个函数来根据多条件自动过滤获取输入数据帧和顶级集群的数量它们的实例和 events/rows 的数量,在 returns filtered/selected 中的输出堆叠结果中,预期数据帧的模拟如下:

def filtering(df, top_out_cluster=2, top_events/rows=2):

#                            +-----+----------+------------+----------+----------+
#                            |   id|prediction|           x|         y|         z|
#                            +-----+----------+------------+----------+----------+
#1st top out cluster         | row4|         3|   -5.122823|-3.3220499|-0.5069805|
#conditions F1, L1, T1       |row10|         3|  -5.6902847|-6.4827404|-0.9246967|
    
#2nd top out cluster         | row8|         7|  -0.2338185| -4.892681| 2.1228876|
#conditions F1, L1, T1       |row19|         7|  -2.0266335|-3.4171724|0.48218703|

#3rd top out cluster         |row18|         2|   3.3193955|-6.3573766| -6.978025|
#conditions F1, L1, T1       |row14|         2|   3.9688683|-6.0467844| -5.462389|
                            

#1st top out cluster         | row6|         5|   7.3153954| -5.079449| -7.291215|
#conditions F2, L2, T2       | row9|         5|    6.565442| -6.855994|-6.7983212|

#2nd top out cluster         |row12|         9|  -6.9042625|-6.1491723|-3.5354295|
#conditions F2, L2, T2       | row0|         9|  -6.0776997|-2.9096103|-1.5181729|

#1st top out cluster         | row1|         4|  -1.0122601|  7.322841|-5.4424076|
#conditions F3, L3, T3       |row11|         4|-0.017986143| 2.7632365| -8.814824|
                            
#2nd top out cluster         |row13|         1|  -10.389865|  9.537853|  0.674591|
#conditions F3, L3, T3       | row5|         1|  -2.4764006|  8.255791|  4.409478|
#                            +-----+----------+------------+----------+----------+

根据您在 colab 上讨论的要求,下面是代码。

from pyspark.sql.window import Window
from pyspark.sql import functions as f
from pyspark.sql.functions import row_number
winspec = Window.partitionBy("prediction").orderBy("prediction")

def get_top_n_clusters(model, top_out_cluster: int):
  n_clusters = model.summary.k
  cluster_size = model.summary.clusterSizes
  
  if top_out_cluster > n_clusters:
    raise ValueError(f"n value cannot be greater than cluster_size")

  col = ['size']
  cluster_size = pd.DataFrame(cluster_size, columns=col).sort_values(by=['size'], ascending=True)  #sorting
  return list(cluster_size.head(top_out_cluster).index)
  

def filtering(df, labels: list, top_records: int):
  winspec = Window.partitionBy("prediction").orderBy("prediction")
  return (
      df.filter(f.col("prediction").isin(labels))
        .withColumn("rowNum", f.row_number().over(winspec))
        .withColumn("minX", min(f.col("x")).over(winspec))
        .withColumn("maxY", max(f.col("y")).over(winspec))
        .withColumn("maxZ", max(f.col("z")).over(winspec))
        .filter(f.col("rowNum")<=top_records)
        .selectExpr("id", "prediction", "minX as x", "maxY as y", "maxZ as z")
  )
cluster_labels = get_top_n_clusters(model, top_out_cluster=4)
fdf = filtering(df_pred, labels=cluster_labels, top_records=1)
fdf.show()

+-----+----------+----------+----------+-----------+
|   id|prediction|         x|         y|          z|
+-----+----------+----------+----------+-----------+
|row15|         7|-10.505684|-1.6820424|-0.32242402|
| row4|         9| -9.426199| 0.5639291|  3.5664654|
| row0|         2| -8.317323|-1.3278837|-0.33906546|
|row14|         4|-0.9338185|-3.6411285| -3.7280529|
+-----+----------+----------+----------+-----------+