Pyspark上SOM(自组织映射)的高效实现

Efficient implementation of SOM (Self organizing map) on Pyspark

我正在努力在 Spark/Pyspark 上为具有 > 100 个特征的庞大数据集实现 SOM 批处理算法的高性能版本。 我觉得我可以在 can/have 的地方使用 RDD 自行指定并行化,或者我使用 Dataframe 应该性能更高,但我看不出如何为每个工作人员使用像本地累积变量这样的东西使用数据框时。

想法:

对不同的选项有什么想法吗?还有更好的选择吗?

或者所有的想法都不是那么好,我应该只预选我的数据集的最大多样性子集,然后在本地训练一个 SOM。 谢谢!

这正是我去年所做的,所以我可能正好可以给你一个答案。

首先,here is my Spark implementation of the batch SOM algorithm(它是用 Scala 编写的,但大多数东西在 Pyspark 中都是相似的)。

我的一个项目需要这个算法,我发现的每个实现都至少有以下两个问题或限制之一:

  • 他们并没有真正实现批处理 SOM 算法,而是使用了地图平均方法,这给了我奇怪的结果(输出地图中的异常对称)
  • 他们没有使用 DataFrame API(纯 RDD API)并且不符合 Spark ML/MLlib 精神,即使用简单的 fit()/transform() API 对数据帧进行操作。

所以,我继续自己编写代码:Spark ML 风格的批处理 SOM 算法。我做的第一件事是查看 k-means 在 Spark ML 中的实现方式,因为如您所知,批处理 SOM 与 k-means 算法非常相似。实际上,我可以重用大部分 Spark ML k-means 代码,但我必须修改核心算法和超参数。

我可以快速总结模型是如何构建的:

  1. 一个SOMParamsclass,包含SOM超参数(大小、训练参数等)
  2. A SOM class,继承spark的Estimator,包含训练算法。特别是,它包含一个对输入 DataFrame 进行操作的 fit() 方法,其中特征存储为单个列中的 spark.ml.linalg.Vectorfit() 然后将 select 这一列并解包 DataFrame 以获得原始的 RDD[Vector] 特征,并在其上调用 run() 方法。这是所有计算发生的地方,正如您猜到的那样,它使用 RDDs、累加器和广播变量。最后,fit()方法returns一个SOMModel对象。
  3. SOMModel是经过训练的SOM模型,继承自spark的Transformer/Model。它包含地图原型(中心向量),并包含一个 transform() 方法,该方法可以通过获取输入特征列并添加带有预测(地图上的投影)的新列来对 DataFrames 进行操作。这是通过预测 UDF 完成的。
  4. 还有 SOMTrainingSummary 收集诸如 objective 函数之类的东西。

要点如下:

  • RDDDataFrame 之间并没有真正的对立(或者更确切地说 Dataset 之间,但这两者之间的区别在这里并不重要)。它们只是在不同的上下文中使用。实际上,DataFrame 可以看作是 RDD 专门用于操作按列组织的结构化数据(例如关系表),允许进行类似 SQL 的操作和执行计划的优化(Catalyst 优化器) .
  • 对于结构化数据,select/filter/aggregation 操作,务必使用 Dataframes。
  • ...但是对于机器学习算法等更复杂的任务,您需要返回到 RDD API 并使用 map/mapPartitions/foreach/reduce/reduceByKey/and 自己分配计算,所以儿子。看看 MLlib 是如何完成的:它只是对 RDD 操作的一个很好的包装!

希望它能解决您的问题。关于性能,正如您要求的 efficient 实现,我还没有做任何基准测试,但我在工作中使用它,它在几分钟内处理了 500k/1M 行数据集生产集群。