不能 运行 来自 spark ML 的 RandomForestClassifier 一个简单的例子
Cannot run RandomForestClassifier from spark ML on a simple example
我已经尝试 运行 spark.ml
包(版本 1.5.2)中的实验性 RandomForestClassifier
。我使用的数据集来自 Spark ML guide 中的 LogisticRegression
示例。
代码如下:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row
// Prepare training data from a list of (label, features) tuples.
val training = sqlContext.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
val rf = new RandomForestClassifier()
val model = rf.fit(training)
这是错误,我得到:
java.lang.IllegalArgumentException: RandomForestClassifier was given input with invalid label column label, without the number of classes specified. See StringIndexer.
at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:87)
at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:42)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
at $iwC$$iwC$$iwC.<init>(<console>:69)
at $iwC$$iwC.<init>(<console>:71)
at $iwC.<init>(<console>:73)
at <init>(<console>:75)
at .<init>(<console>:79)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
当函数尝试计算列 "label"
中的 类 的数量时出现问题。
正如您在 source code of RandomForestClassifier 中的第 84 行所见,该函数调用带有参数 "label"
的 DataFrame.schema
函数。此调用正常并且 return 是一个 org.apache.spark.sql.types.StructField
对象。
然后,函数 org.apache.spark.ml.util.MetadataUtils.getNumClasses
被调用。由于它没有 return 预期的输出,因此在第 87 行引发异常。
快速浏览了 getNumClasses
source code 之后,我认为错误是由于 colmun "label"
中的数据既不是 BinaryAttribute
也不是 NominalAttribute
。 但是,我不知道如何解决这个问题。
我的问题:
我该如何解决这个问题?
非常感谢您阅读我的问题并提供帮助!
让我们首先修复导入以消除歧义
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.feature.{StringIndexer, VectorIndexer}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.linalg.Vectors
我将使用与您相同的数据:
val training = sqlContext.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
然后创建管道阶段:
val stages = new scala.collection.mutable.ArrayBuffer[PipelineStage]()
- 为了分类,重新索引 类 :
val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(training)
- 使用 VectorIndexer 识别分类特征
val featuresIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(10).fit(training)
stages += featuresIndexer
val tmp = featuresIndexer.transform(labelIndexer.transform(training))
- 学习随机森林
val rf = new RandomForestClassifier().setFeaturesCol(featuresIndexer.getOutputCol).setLabelCol(labelIndexer.getOutputCol)
stages += rf
val pipeline = new Pipeline().setStages(stages.toArray)
// Fit the Pipeline
val pipelineModel = pipeline.fit(tmp)
val results = pipelineModel.transform(training)
results.show
//+-----+--------------+---------------+-------------+-----------+----------+
//|label| features|indexedFeatures|rawPrediction|probability|prediction|
//+-----+--------------+---------------+-------------+-----------+----------+
//| 1.0| [0.0,1.1,0.1]| [0.0,1.0,2.0]| [1.0,19.0]|[0.05,0.95]| 1.0|
//| 0.0|[2.0,1.0,-1.0]| [1.0,0.0,0.0]| [17.0,3.0]|[0.85,0.15]| 0.0|
//| 0.0| [2.0,1.3,1.0]| [1.0,3.0,3.0]| [14.0,6.0]| [0.7,0.3]| 0.0|
//| 1.0|[0.0,1.2,-0.5]| [0.0,2.0,1.0]| [1.0,19.0]|[0.05,0.95]| 1.0|
//+-----+--------------+---------------+-------------+-----------+----------+
参考文献:关于第 1 步和第 2 步。对于那些想要了解更多关于 Feature transformers 的详细信息的人,我建议您阅读官方文档 here.
我已经尝试 运行 spark.ml
包(版本 1.5.2)中的实验性 RandomForestClassifier
。我使用的数据集来自 Spark ML guide 中的 LogisticRegression
示例。
代码如下:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row
// Prepare training data from a list of (label, features) tuples.
val training = sqlContext.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
val rf = new RandomForestClassifier()
val model = rf.fit(training)
这是错误,我得到:
java.lang.IllegalArgumentException: RandomForestClassifier was given input with invalid label column label, without the number of classes specified. See StringIndexer.
at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:87)
at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:42)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
at $iwC$$iwC$$iwC.<init>(<console>:69)
at $iwC$$iwC.<init>(<console>:71)
at $iwC.<init>(<console>:73)
at <init>(<console>:75)
at .<init>(<console>:79)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
当函数尝试计算列 "label"
中的 类 的数量时出现问题。
正如您在 source code of RandomForestClassifier 中的第 84 行所见,该函数调用带有参数 "label"
的 DataFrame.schema
函数。此调用正常并且 return 是一个 org.apache.spark.sql.types.StructField
对象。
然后,函数 org.apache.spark.ml.util.MetadataUtils.getNumClasses
被调用。由于它没有 return 预期的输出,因此在第 87 行引发异常。
快速浏览了 getNumClasses
source code 之后,我认为错误是由于 colmun "label"
中的数据既不是 BinaryAttribute
也不是 NominalAttribute
。 但是,我不知道如何解决这个问题。
我的问题:
我该如何解决这个问题?
非常感谢您阅读我的问题并提供帮助!
让我们首先修复导入以消除歧义
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.feature.{StringIndexer, VectorIndexer}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.linalg.Vectors
我将使用与您相同的数据:
val training = sqlContext.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
然后创建管道阶段:
val stages = new scala.collection.mutable.ArrayBuffer[PipelineStage]()
- 为了分类,重新索引 类 :
val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(training)
- 使用 VectorIndexer 识别分类特征
val featuresIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(10).fit(training)
stages += featuresIndexer
val tmp = featuresIndexer.transform(labelIndexer.transform(training))
- 学习随机森林
val rf = new RandomForestClassifier().setFeaturesCol(featuresIndexer.getOutputCol).setLabelCol(labelIndexer.getOutputCol)
stages += rf
val pipeline = new Pipeline().setStages(stages.toArray)
// Fit the Pipeline
val pipelineModel = pipeline.fit(tmp)
val results = pipelineModel.transform(training)
results.show
//+-----+--------------+---------------+-------------+-----------+----------+
//|label| features|indexedFeatures|rawPrediction|probability|prediction|
//+-----+--------------+---------------+-------------+-----------+----------+
//| 1.0| [0.0,1.1,0.1]| [0.0,1.0,2.0]| [1.0,19.0]|[0.05,0.95]| 1.0|
//| 0.0|[2.0,1.0,-1.0]| [1.0,0.0,0.0]| [17.0,3.0]|[0.85,0.15]| 0.0|
//| 0.0| [2.0,1.3,1.0]| [1.0,3.0,3.0]| [14.0,6.0]| [0.7,0.3]| 0.0|
//| 1.0|[0.0,1.2,-0.5]| [0.0,2.0,1.0]| [1.0,19.0]|[0.05,0.95]| 1.0|
//+-----+--------------+---------------+-------------+-----------+----------+
参考文献:关于第 1 步和第 2 步。对于那些想要了解更多关于 Feature transformers 的详细信息的人,我建议您阅读官方文档 here.