如何在 Spark 2.1.0 中创建一个拟合 PipelineModelS 数组?

How can I create an Array of fitted PipelineModelS in Spark 2.1.0?

我正在尝试将每个经过训练的决策树模型的 PipelineModel 存储到 Array 中。虽然我创建了一个 PipelineModelS 的数组,但我有以下不匹配错误:

<console>:96: error: type mismatch;
found   : model.type (with underlying type org.apache.spark.ml.PipelineModel)
required: org.apache.spark.ml.PipelineModel.type
           bestModels(i) = model // Here is the problem!!!

有人可以帮帮我吗? iris flowers 的数据集(采用 libsvm 格式)可在此处 https://1drv.ms/u/s!Antm9EMPXrQmgP9zQhgdAdxUBSAtSA 找到。这是示例代码:

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.sql.types._

val folds = 10
val data =  spark.read.format("libsvm").load("/home/vitrion/Documents/iris.libsvm")
var accuracies = Array.fill(folds)(0.0)
var bestModels = Array.fill(folds)(PipelineModel) // This is the array of PipelineModelS

val Array(trainData, testData) = data.orderBy(rand()).randomSplit(Array(0.7, 0.3), seed = 1234L)
val foldedData = trainData.orderBy(rand()).randomSplit(Array.fill(10)(1.0 / folds))
val evaluator1 = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")

for( i <- 0 to folds - 1 ){
    var provTrainData = data.limit(0)
    var provTestData = data.limit(0)
    var foldStr = ""
    for( j <- 0 to folds - 1){
        var str = ""
        if (i != j) {
            provTrainData = provTrainData.union(foldedData(j))
            str = "T"
        } else {
            provTestData = foldedData(i)
            str = "S"
        }
        foldStr += str
    }
    println(foldStr)
    val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(trainData)
    val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(6).fit(trainData)
    val dt = new DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
    val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
    val pipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))            
    val model = pipeline.fit(provTrainData)

    var provPredictions = model.transform(provTestData)
    accuracies(i) = evaluator1.evaluate(provPredictions)
    bestModels(i) = model // Here is the problem!!!
    println("FOLD " + i + "\nAccuracy: " + accuracies(i))
}

直接的问题是:

Array.fill(folds)(PipelineModel)

创建一个 Array[PipelineModel.type],而不是 Array[PipelineModel]。您可以:

val bestModels: Array[PipelineModel] = Array.ofDim[PipelineModel](folds)

或:

val bestModels: Array[PipelineModel] = Array.fill(folds)(null)

另一方面,这里不需要var。既然你是可变对象,val 就可以了。

此外 Pipeline 定义可以简化为:

val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .setHandleInvalid("skip")
val dt = new DecisionTreeClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("features")

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, dt))

甚至 DecisionTreeClassifier 单独(调整 evaluator.labelCol):

val dt = new DecisionTreeClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")

// No need for pipeline here
val pipeline = new Pipeline()
  .setStages(Array(dt))

并退出循环。

请注意,您应该提供一种策略来处理 StringIndexer 的不可见标签,尤其是对于像这样的小数据集。

由于 iris 数据集不包含分类特征VectorIndexer 已过时,DecisionTreeClassifier 应处理具有连续值的数字标签而不进行索引(至少在 Spark 2.0 及更高版本中)。

最后循环可以简化为

val (models, accuracies) = (0 until folds).map { fold => {
  val train = foldedData.patch(fold, Nil, 1).reduce(_ union _)
  val test = foldedData(fold)
  val model = pipeline.fit(train)
  val predictions = model.transform(test)
  (model, evaluator1.evaluate(predictions))
}}.unzip

如果您真的想要所有带有索引和编码的预处理步骤,那么只应用一次会更有意义。