如何在 Spark 2.1.0 中创建一个拟合 PipelineModelS 数组?
How can I create an Array of fitted PipelineModelS in Spark 2.1.0?
我正在尝试将每个经过训练的决策树模型的 PipelineModel
存储到 Array
中。虽然我创建了一个 PipelineModelS
的数组,但我有以下不匹配错误:
<console>:96: error: type mismatch;
found : model.type (with underlying type org.apache.spark.ml.PipelineModel)
required: org.apache.spark.ml.PipelineModel.type
bestModels(i) = model // Here is the problem!!!
有人可以帮帮我吗? iris flowers 的数据集(采用 libsvm 格式)可在此处 https://1drv.ms/u/s!Antm9EMPXrQmgP9zQhgdAdxUBSAtSA 找到。这是示例代码:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.sql.types._
val folds = 10
val data = spark.read.format("libsvm").load("/home/vitrion/Documents/iris.libsvm")
var accuracies = Array.fill(folds)(0.0)
var bestModels = Array.fill(folds)(PipelineModel) // This is the array of PipelineModelS
val Array(trainData, testData) = data.orderBy(rand()).randomSplit(Array(0.7, 0.3), seed = 1234L)
val foldedData = trainData.orderBy(rand()).randomSplit(Array.fill(10)(1.0 / folds))
val evaluator1 = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
for( i <- 0 to folds - 1 ){
var provTrainData = data.limit(0)
var provTestData = data.limit(0)
var foldStr = ""
for( j <- 0 to folds - 1){
var str = ""
if (i != j) {
provTrainData = provTrainData.union(foldedData(j))
str = "T"
} else {
provTestData = foldedData(i)
str = "S"
}
foldStr += str
}
println(foldStr)
val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(trainData)
val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(6).fit(trainData)
val dt = new DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
val pipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
val model = pipeline.fit(provTrainData)
var provPredictions = model.transform(provTestData)
accuracies(i) = evaluator1.evaluate(provPredictions)
bestModels(i) = model // Here is the problem!!!
println("FOLD " + i + "\nAccuracy: " + accuracies(i))
}
直接的问题是:
Array.fill(folds)(PipelineModel)
创建一个 Array[PipelineModel.type]
,而不是 Array[PipelineModel]
。您可以:
val bestModels: Array[PipelineModel] = Array.ofDim[PipelineModel](folds)
或:
val bestModels: Array[PipelineModel] = Array.fill(folds)(null)
另一方面,这里不需要var
。既然你是可变对象,val
就可以了。
此外 Pipeline
定义可以简化为:
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.setHandleInvalid("skip")
val dt = new DecisionTreeClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, dt))
甚至 DecisionTreeClassifier
单独(调整 evaluator.labelCol
):
val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
// No need for pipeline here
val pipeline = new Pipeline()
.setStages(Array(dt))
并退出循环。
请注意,您应该提供一种策略来处理 StringIndexer
的不可见标签,尤其是对于像这样的小数据集。
由于 iris 数据集不包含分类特征VectorIndexer
已过时,DecisionTreeClassifier
应处理具有连续值的数字标签而不进行索引(至少在 Spark 2.0 及更高版本中)。
最后循环可以简化为
val (models, accuracies) = (0 until folds).map { fold => {
val train = foldedData.patch(fold, Nil, 1).reduce(_ union _)
val test = foldedData(fold)
val model = pipeline.fit(train)
val predictions = model.transform(test)
(model, evaluator1.evaluate(predictions))
}}.unzip
如果您真的想要所有带有索引和编码的预处理步骤,那么只应用一次会更有意义。
我正在尝试将每个经过训练的决策树模型的 PipelineModel
存储到 Array
中。虽然我创建了一个 PipelineModelS
的数组,但我有以下不匹配错误:
<console>:96: error: type mismatch;
found : model.type (with underlying type org.apache.spark.ml.PipelineModel)
required: org.apache.spark.ml.PipelineModel.type
bestModels(i) = model // Here is the problem!!!
有人可以帮帮我吗? iris flowers 的数据集(采用 libsvm 格式)可在此处 https://1drv.ms/u/s!Antm9EMPXrQmgP9zQhgdAdxUBSAtSA 找到。这是示例代码:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.sql.types._
val folds = 10
val data = spark.read.format("libsvm").load("/home/vitrion/Documents/iris.libsvm")
var accuracies = Array.fill(folds)(0.0)
var bestModels = Array.fill(folds)(PipelineModel) // This is the array of PipelineModelS
val Array(trainData, testData) = data.orderBy(rand()).randomSplit(Array(0.7, 0.3), seed = 1234L)
val foldedData = trainData.orderBy(rand()).randomSplit(Array.fill(10)(1.0 / folds))
val evaluator1 = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
for( i <- 0 to folds - 1 ){
var provTrainData = data.limit(0)
var provTestData = data.limit(0)
var foldStr = ""
for( j <- 0 to folds - 1){
var str = ""
if (i != j) {
provTrainData = provTrainData.union(foldedData(j))
str = "T"
} else {
provTestData = foldedData(i)
str = "S"
}
foldStr += str
}
println(foldStr)
val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(trainData)
val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(6).fit(trainData)
val dt = new DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
val pipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
val model = pipeline.fit(provTrainData)
var provPredictions = model.transform(provTestData)
accuracies(i) = evaluator1.evaluate(provPredictions)
bestModels(i) = model // Here is the problem!!!
println("FOLD " + i + "\nAccuracy: " + accuracies(i))
}
直接的问题是:
Array.fill(folds)(PipelineModel)
创建一个 Array[PipelineModel.type]
,而不是 Array[PipelineModel]
。您可以:
val bestModels: Array[PipelineModel] = Array.ofDim[PipelineModel](folds)
或:
val bestModels: Array[PipelineModel] = Array.fill(folds)(null)
另一方面,这里不需要var
。既然你是可变对象,val
就可以了。
此外 Pipeline
定义可以简化为:
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.setHandleInvalid("skip")
val dt = new DecisionTreeClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, dt))
甚至 DecisionTreeClassifier
单独(调整 evaluator.labelCol
):
val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
// No need for pipeline here
val pipeline = new Pipeline()
.setStages(Array(dt))
并退出循环。
请注意,您应该提供一种策略来处理 StringIndexer
的不可见标签,尤其是对于像这样的小数据集。
由于 iris 数据集不包含分类特征VectorIndexer
已过时,DecisionTreeClassifier
应处理具有连续值的数字标签而不进行索引(至少在 Spark 2.0 及更高版本中)。
最后循环可以简化为
val (models, accuracies) = (0 until folds).map { fold => {
val train = foldedData.patch(fold, Nil, 1).reduce(_ union _)
val test = foldedData(fold)
val model = pipeline.fit(train)
val predictions = model.transform(test)
(model, evaluator1.evaluate(predictions))
}}.unzip
如果您真的想要所有带有索引和编码的预处理步骤,那么只应用一次会更有意义。