Pyspark上SOM(自组织映射)的高效实现
Efficient implementation of SOM (Self organizing map) on Pyspark
我正在努力在 Spark/Pyspark 上为具有 > 100 个特征的庞大数据集实现 SOM 批处理算法的高性能版本。
我觉得我可以在 can/have 的地方使用 RDD 自行指定并行化,或者我使用 Dataframe 应该性能更高,但我看不出如何为每个工作人员使用像本地累积变量这样的东西使用数据框时。
想法:
- 使用累加器。通过创建一个 UDF 来并行计算,该 UDF 将观察结果作为输入,计算对网络的影响并将影响发送到驱动程序中的累加器。 (已经实现了这个版本,但看起来相当慢(我认为累加器更新需要很长时间))
- 将结果存储在Dataframe 的新列中,最后将它们相加。 (必须在每一行中存储整个神经网络(例如 20*20*130))Spark 优化算法是否意识到,它不需要保存每个网络而只需将它们相加?
- 使用类似于以下的 RDD 创建自定义并行化算法:https://machinelearningnepal.com/2018/01/22/apache-spark-implementation-of-som-batch-algorithm/(但具有更高性能的计算算法)。但我将不得不使用某种循环遍历每一行并更新网络 -> 听起来性能相当低下。)
对不同的选项有什么想法吗?还有更好的选择吗?
或者所有的想法都不是那么好,我应该只预选我的数据集的最大多样性子集,然后在本地训练一个 SOM。
谢谢!
这正是我去年所做的,所以我可能正好可以给你一个答案。
首先,here is my Spark implementation of the batch SOM algorithm(它是用 Scala 编写的,但大多数东西在 Pyspark 中都是相似的)。
我的一个项目需要这个算法,我发现的每个实现都至少有以下两个问题或限制之一:
- 他们并没有真正实现批处理 SOM 算法,而是使用了地图平均方法,这给了我奇怪的结果(输出地图中的异常对称)
- 他们没有使用 DataFrame API(纯 RDD API)并且不符合 Spark ML/MLlib 精神,即使用简单的
fit()
/transform()
API 对数据帧进行操作。
所以,我继续自己编写代码:Spark ML 风格的批处理 SOM 算法。我做的第一件事是查看 k-means 在 Spark ML 中的实现方式,因为如您所知,批处理 SOM 与 k-means 算法非常相似。实际上,我可以重用大部分 Spark ML k-means 代码,但我必须修改核心算法和超参数。
我可以快速总结模型是如何构建的:
- 一个
SOMParams
class,包含SOM超参数(大小、训练参数等)
- A
SOM
class,继承spark的Estimator
,包含训练算法。特别是,它包含一个对输入 DataFrame
进行操作的 fit()
方法,其中特征存储为单个列中的 spark.ml.linalg.Vector
。 fit()
然后将 select 这一列并解包 DataFrame
以获得原始的 RDD[Vector]
特征,并在其上调用 run()
方法。这是所有计算发生的地方,正如您猜到的那样,它使用 RDD
s、累加器和广播变量。最后,fit()
方法returns一个SOMModel
对象。
SOMModel
是经过训练的SOM模型,继承自spark的Transformer
/Model
。它包含地图原型(中心向量),并包含一个 transform()
方法,该方法可以通过获取输入特征列并添加带有预测(地图上的投影)的新列来对 DataFrames
进行操作。这是通过预测 UDF 完成的。
- 还有
SOMTrainingSummary
收集诸如 objective 函数之类的东西。
要点如下:
-
RDD
和 DataFrame
之间并没有真正的对立(或者更确切地说 Dataset
之间,但这两者之间的区别在这里并不重要)。它们只是在不同的上下文中使用。实际上,DataFrame 可以看作是 RDD
专门用于操作按列组织的结构化数据(例如关系表),允许进行类似 SQL 的操作和执行计划的优化(Catalyst 优化器) .
- 对于结构化数据,select/filter/aggregation 操作,务必使用
Dataframe
s。
- ...但是对于机器学习算法等更复杂的任务,您需要返回到
RDD
API 并使用 map/mapPartitions/foreach/reduce/reduceByKey/and 自己分配计算,所以儿子。看看 MLlib 是如何完成的:它只是对 RDD 操作的一个很好的包装!
希望它能解决您的问题。关于性能,正如您要求的 efficient 实现,我还没有做任何基准测试,但我在工作中使用它,它在几分钟内处理了 500k/1M 行数据集生产集群。
我正在努力在 Spark/Pyspark 上为具有 > 100 个特征的庞大数据集实现 SOM 批处理算法的高性能版本。 我觉得我可以在 can/have 的地方使用 RDD 自行指定并行化,或者我使用 Dataframe 应该性能更高,但我看不出如何为每个工作人员使用像本地累积变量这样的东西使用数据框时。
想法:
- 使用累加器。通过创建一个 UDF 来并行计算,该 UDF 将观察结果作为输入,计算对网络的影响并将影响发送到驱动程序中的累加器。 (已经实现了这个版本,但看起来相当慢(我认为累加器更新需要很长时间))
- 将结果存储在Dataframe 的新列中,最后将它们相加。 (必须在每一行中存储整个神经网络(例如 20*20*130))Spark 优化算法是否意识到,它不需要保存每个网络而只需将它们相加?
- 使用类似于以下的 RDD 创建自定义并行化算法:https://machinelearningnepal.com/2018/01/22/apache-spark-implementation-of-som-batch-algorithm/(但具有更高性能的计算算法)。但我将不得不使用某种循环遍历每一行并更新网络 -> 听起来性能相当低下。)
对不同的选项有什么想法吗?还有更好的选择吗?
或者所有的想法都不是那么好,我应该只预选我的数据集的最大多样性子集,然后在本地训练一个 SOM。 谢谢!
这正是我去年所做的,所以我可能正好可以给你一个答案。
首先,here is my Spark implementation of the batch SOM algorithm(它是用 Scala 编写的,但大多数东西在 Pyspark 中都是相似的)。
我的一个项目需要这个算法,我发现的每个实现都至少有以下两个问题或限制之一:
- 他们并没有真正实现批处理 SOM 算法,而是使用了地图平均方法,这给了我奇怪的结果(输出地图中的异常对称)
- 他们没有使用 DataFrame API(纯 RDD API)并且不符合 Spark ML/MLlib 精神,即使用简单的
fit()
/transform()
API 对数据帧进行操作。
所以,我继续自己编写代码:Spark ML 风格的批处理 SOM 算法。我做的第一件事是查看 k-means 在 Spark ML 中的实现方式,因为如您所知,批处理 SOM 与 k-means 算法非常相似。实际上,我可以重用大部分 Spark ML k-means 代码,但我必须修改核心算法和超参数。
我可以快速总结模型是如何构建的:
- 一个
SOMParams
class,包含SOM超参数(大小、训练参数等) - A
SOM
class,继承spark的Estimator
,包含训练算法。特别是,它包含一个对输入DataFrame
进行操作的fit()
方法,其中特征存储为单个列中的spark.ml.linalg.Vector
。fit()
然后将 select 这一列并解包DataFrame
以获得原始的RDD[Vector]
特征,并在其上调用run()
方法。这是所有计算发生的地方,正如您猜到的那样,它使用RDD
s、累加器和广播变量。最后,fit()
方法returns一个SOMModel
对象。 SOMModel
是经过训练的SOM模型,继承自spark的Transformer
/Model
。它包含地图原型(中心向量),并包含一个transform()
方法,该方法可以通过获取输入特征列并添加带有预测(地图上的投影)的新列来对DataFrames
进行操作。这是通过预测 UDF 完成的。- 还有
SOMTrainingSummary
收集诸如 objective 函数之类的东西。
要点如下:
-
RDD
和DataFrame
之间并没有真正的对立(或者更确切地说Dataset
之间,但这两者之间的区别在这里并不重要)。它们只是在不同的上下文中使用。实际上,DataFrame 可以看作是RDD
专门用于操作按列组织的结构化数据(例如关系表),允许进行类似 SQL 的操作和执行计划的优化(Catalyst 优化器) . - 对于结构化数据,select/filter/aggregation 操作,务必使用
Dataframe
s。 - ...但是对于机器学习算法等更复杂的任务,您需要返回到
RDD
API 并使用 map/mapPartitions/foreach/reduce/reduceByKey/and 自己分配计算,所以儿子。看看 MLlib 是如何完成的:它只是对 RDD 操作的一个很好的包装!
希望它能解决您的问题。关于性能,正如您要求的 efficient 实现,我还没有做任何基准测试,但我在工作中使用它,它在几分钟内处理了 500k/1M 行数据集生产集群。