如何在 Scala 中保存 RandomForestClassifier Spark 模型?

How to save RandomForestClassifier Spark model in scala?

我使用以下代码构建了一个随机森林模型:

import org.apache.spark.ml.classification.RandomForestClassificationModel
import org.apache.spark.ml.classification.RandomForestClassifier
val rf = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("features")
val labelConverter = new    IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
val training = labelIndexer.transform(df)
val model = rf.fit(training)

现在我想保存模型以便稍后使用以下代码进行预测:

val predictions: DataFrame = model.transform(testData)

我查看了 Spark 文档 here,但没有找到执行该操作的任何选项。任何的想法? 我花了几个小时来构建模型,如果 Spark 崩溃,我将无法恢复。

它位于 MLWriter 界面中 - 可通过模型上的 writer 属性访问:

model.asInstanceOf[MLWritable].write.save(path)

界面如下:

abstract class MLWriter extends BaseReadWrite with Logging {

  protected var shouldOverwrite: Boolean = false

  /**
   * Saves the ML instances to the input path.
   */
  @Since("1.6.0")
  @throws[IOException]("If the input path already exists but overwrite is not enabled.")
  def save(path: String): Unit = {

这是对 mllib/spark.ml

早期版本的重构

更新 模型似乎不可可写:

Exception in thread "main" java.lang.UnsupportedOperationException: Pipeline write will fail on this Pipeline because it contains a stage which does not implement Writable. Non-Writable stage: rfc_4e467607406f of type class org.apache.spark.ml.classification.RandomForestClassificationModel

因此可能没有直接的解决方案。

可以使用 Spark 1.6 在 HDFS 中保存和重新加载基于树的模型,对基于管道的模型和基本模型使用 saveAsObjectFile()。 下面是基于管道模型的示例。

// model
val model = pipeline.fit(trainingData)

// Create rdd using Seq 
sc.parallelize(Seq(model), 1).saveAsObjectFile("hdfs://filepath")

// Reload model by using it's class
// You can get class of object using object.getClass()
val sameModel = sc.objectFile[PipelineModel]("filepath").first()

对于 RandomForestClassifier 保存和加载模型:测试 spark 1.6.2 + scala in ml(在 spark 2.0 中,您可以为模型提供直接保存选项)

import org.apache.spark.ml.classification.RandomForestClassificationModel
import org.apache.spark.ml.classification.RandomForestClassifier //imports
val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
val model = classifier.fit(trainingData)

sc.parallelize(Seq(model), 1).saveAsObjectFile(modelSavePath) //保存模型

val linRegModel = sc.objectFile[RandomForestClassificationModel](modelSavePath).first() //load model
`val predictions1 = linRegModel.transform(testData)` //predictions1  is dataframe 

这是对应于上述 Scala saveAsObjectFile() 答案的 PySpark v1.6 实现。

它强制 Python 个对象 to/from Java 个对象以使用 saveAsObjectFile() 实现序列化。

没有 Java 强制,我在序列化时遇到了奇怪的 Py4J 错误。如果有人有更简单的实现,请编辑或评论。

保存经过训练的 RandomForestClassificationModel 对象:

# Save RandomForestClassificationModel to hdfs
gateway = sc._gateway
java_list = gateway.jvm.java.util.ArrayList()
java_list.add(rfModel._java_obj)
modelRdd = sc._jsc.parallelize(java_list)
modelRdd.saveAsObjectFile("hdfs:///some/path/rfModel")

加载经过训练的 RandomForestClassificationModel 对象:

# Load RandomForestClassificationModel from hdfs
rfObjectFileLoaded = sc._jsc.objectFile("hdfs:///some/path/rfModel")
rfModelLoaded_JavaObject = rfObjectFileLoaded.first()
rfModelLoaded = RandomForestClassificationModel(rfModelLoaded_JavaObject)
predictions = rfModelLoaded.transform(test_input_df)