加载持续的 CrossValidatorModel 抛出 "Param numTrees does not exist" 错误

Loading persisted CrossValidatorModel throws "Param numTrees does not exist" Error

我正在使用 Spark 2.0 创建一个 RandomForestClassifier 来解决多类分类问题。我能够成功训练模型并使用 model.save() 方法将训练后的模型保存到 S3 存储桶中。但是,在使用 load() 加载此模型时,出现以下错误。

`

Exception in thread "main" java.util.NoSuchElementException: Param numTrees does not exist.
    at org.apache.spark.ml.param.Params$$anonfun$getParam.apply(params.scala:609)
    at org.apache.spark.ml.param.Params$$anonfun$getParam.apply(params.scala:609)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.ml.param.Params$class.getParam(params.scala:608)
    at org.apache.spark.ml.PipelineStage.getParam(Pipeline.scala:42)
  at org.apache.spark.ml.util.DefaultParamsReader$$anonfun$getAndSetParams.apply(ReadWrite.scala:430)
at org.apache.spark.ml.util.DefaultParamsReader$$anonfun$getAndSetParams.apply(ReadWrite.scala:429)
    at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.ml.util.DefaultParamsReader$.getAndSetParams(ReadWrite.scala:429)
    at org.apache.spark.ml.classification.RandomForestClassificationModel$RandomForestClassificationModelReader.load(RandomForestClassifier.scala:310)
at org.apache.spark.ml.classification.RandomForestClassificationModel$RandomForestClassificationModelReader.load(RandomForestClassifier.scala:284)
    at org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstance(ReadWrite.scala:447)
    at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun.apply(Pipeline.scala:267)
o.a.p.h.InternalParquetRecordReader         at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun.apply(Pipeline.scala:265)
:   at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
 block read in memory in 4226 ms. row count = 52598
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
2017-05-04 21:53:08.140 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:265)
at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:341)
    at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:335)
--- at org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstance(ReadWrite.scala:447)
    at org.apache.spark.ml.tuning.CrossValidatorModel$CrossValidatorModelReader.load(CrossValidator.scala:269)
at org.apache.spark.ml.tuning.CrossValidatorModel$CrossValidatorModelReader.load(CrossValidator.scala:256)
at org.apache.spark.ml.util.MLReadable$class.load(ReadWrite.scala:227)
    at org.apache.spark.ml.tuning.CrossValidatorModel$.load(CrossValidator.scala:240)
:   at org.apache.spark.ml.tuning.CrossValidatorModel.load(CrossValidator.scala)

`

下面是我用来训练和保存模型的代码片段

val assembler = new VectorAssembler();
assembler.setInputCols(inputColumnNames);
assembler.setOutputCol("Inputs_Indexed");


//split 70:30 training and test data
val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))

//train using RandomForest Model
val rf = new RandomForestClassifier()
        .setLabelCol("Facing_Indexed")
        .setFeaturesCol("Inputs_Indexed")
        .setNumTrees(500);

val labelConverter = new IndexToString()
                .setInputCol("prediction")
                .setOutputCol("predictedLabel")
                .setLabels(labelIndexer.labels);

val stageList = new ArrayList[PipelineStage];
stageList.addAll(categoricalInputModels);
stageList.add(labelIndexer);
stageList.add(assembler);
stageList.add(rf);
stageList.add(labelConverter);

val stages= new Array[PipelineStage](stageList.size);

//convert stages list to array
stageList.toArray(stages);

val pipeline = new Pipeline().setStages(stages)

val paramGrid = new ParamGridBuilder().addGrid(rf.maxDepth, Array(3, 5, 8)).build()

val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("Facing_Indexed")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")

val cv = new CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid)

val model = cv.fit(trainingData)

val predictions = model.transform(testData);

predictions.select("predictedLabel", "Facing", "Inputs_Indexed").show(5);

val accuracy = evaluator.evaluate(predictions)
  println("Test Error = " + (1.0 - accuracy))

model.save("s3n://xyz_path/au.model")

保存训练模型后,我使用 CrossValidatorModel.load("s3n://xyz_path/au.model") 在单独的 Java 程序中加载模型,该程序抛出上述错误。在我的 S3 存储桶中,我可以看到正在保存的序列化模型。我不确定哪里出了问题。感谢有关此错误的任何帮助。

我知道问题出在哪里了。 AWS EMR 集群是 运行 Spark 2.1.0,我正在使用它训练并将我的模型保存到 S3 存储桶。然而,在我的 Java 程序中,我指向 2.0.0 版本的 Spark MLLib。我发现在此处 http://spark.apache.org/docs/latest/ml-guide.html#from-20-to-21

的 2.0 到 2.1 迁移指南中报告的 RandomForestClassifierModel 中存在与 "numTrees" 参数相关的重大更改

所以,最后我在我的 Java 项目中更新了 Spark MLLib maven 依赖项以指向版本 2.1.0。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.11</artifactId>
    <version>2.1.0</version>
</dependency>

然后它抱怨又少了一个class

java.lang.NoClassDefFoundError: org/codehaus/commons/compiler/UncheckedCompileException

当我添加 commons-compiler 依赖时它得到修复

<dependency>
    <groupId>org.codehaus.janino</groupId>
    <artifactId>commons-compiler</artifactId>
    <version>2.7.8</version>
</dependency>

这就是我的持久化模型最终成功加载的方式!