聚类中心在 Spark MLlib 中具有不同的维度

Cluster centers have different dimensionality in Spark MLlib

我使用 Spark 2.0.2。我有按天分区的数据。我想对彼此独立的不同分区进行聚类,而不是比较聚类中心(计算它们之间的距离)以查看聚类如何随时间变化。

我对每个分区进行完全相同的预处理(缩放、一次热编码等)。我为此使用了一个预定义的管道,它在 "normal" 学习和预测环境中完美运行。但是当我要计算聚类中心之间的距离时,不同分区对应的向量具有不同的大小(不同维度)。

一些代码片段:

预处理管道是这样构建的:

val protoIndexer = new StringIndexer().setInputCol("protocol").setOutputCol("protocolIndexed").setHandleInvalid("skip")
val serviceIndexer = new StringIndexer().setInputCol("service").setOutputCol("serviceIndexed").setHandleInvalid("skip")
val directionIndexer = new StringIndexer().setInputCol("direction").setOutputCol("directionIndexed").setHandleInvalid("skip")

val protoEncoder = new OneHotEncoder().setInputCol("protocolIndexed").setOutputCol("protocolEncoded")
val serviceEncoder = new OneHotEncoder().setInputCol("serviceIndexed").setOutputCol("serviceEncoded")
val directionEncoder = new OneHotEncoder().setInputCol("directionIndexed").setOutputCol("directionEncoded")

val scaleAssembler = new VectorAssembler().setInputCols(Array("duration", "bytes", "packets", "tos", "host_count", "srv_count")).setOutputCol("scalableFeatures")
val scaler = new StandardScaler().setInputCol("scalableFeatures").setOutputCol("scaledFeatures")
val featureAssembler = new VectorAssembler().setInputCols(Array("scaledFeatures", "protocolEncoded", "urgent", "ack", "psh", "rst", "syn", "fin", "serviceEncoded", "directionEncoded")).setOutputCol("features")
val pipeline = new Pipeline().setStages(Array(protoIndexer, protoEncoder, serviceIndexer, serviceEncoder, directionIndexer, directionEncoder, scaleAssembler, scaler, featureAssembler))
pipeline.write.overwrite().save(config.getString("pipeline"))

定义 k-means,加载预定义的预处理管道,将 k-means 添加到管道中:

val kmeans = new KMeans().setK(40).setTol(1.0e-6).setFeaturesCol("features")
val pipelineStages = Pipeline.load(config.getString("pipeline")).getStages
val pipeline = new Pipeline().setStages(pipelineStages ++ Array(kmeans))

加载数据分区、计算特征、拟合管道、获取 k-means 模型并显示第一个聚类中心的大小作为示例:

(1 to 7 by 1).map { day =>
  val data = sparkContext.textFile("path/to/data/" + day + "/")
  val rawFeatures = data.map(extractFeatures....).toDF(featureHeaders: _*)
  val model = pipeline.fit(rawFeatures)

  val kmeansModel = model.stages(model.stages.size - 1).asInstanceOf[KMeansModel]
  println(kmeansModel.clusterCenters(0).size)
}

对于不同的分区,聚类中心具有不同的维度(但对于分区内的 40 个聚类中的每一个都相同)。所以我无法计算它们之间的距离。我怀疑它们的大小都相等(即我的欧几里德 space 的大小是 13,因为我有 13 个特征)。但它给出了我不明白的奇怪数字。

我将提取的特征向量保存到一个文件中以检查它们。他们的格式是可疑的。每个功能都存在。

知道我做错了什么或者我有什么误解吗?谢谢!

跳过 KMeans is not a good choice for processing categorical data 您的代码不能保证的事实:

  • 同一索引-批次之间的特征关系。 StringIndexer 按照频率分配标签。最常见的字符串编码为 0,最不常见的编码为 numLabels - 1.
  • 批次之间的索引数量相同,因此 one-hot-encoded 和组装向量的形状相同。矢量的大小等于根据 OneHotEncoder.
  • 中的 dropLast 参数值调整的唯一标签数

因此,编码向量在不同批次之间可能具有不同的维度和解释。

如果您想要一致的编码,您将需要持久的字典映射,以确保批次之间的一致索引。