Spark 多类分类示例
Spark Multiclass Classification Example
你们知道我在哪里可以找到 Spark 中多类分类的示例吗?我花了很多时间在书籍和网络上搜索,到目前为止我只知道这是可能的,因为根据文档是最新版本。
ML
(Spark 2.0+推荐)
我们将使用与下面 MLlib 中相同的数据。有两个基本选项。如果Estimator
支持开箱即用的多类分类(例如随机森林),您可以直接使用它:
val trainRawDf = trainRaw.toDF
import org.apache.spark.ml.feature.{Tokenizer, CountVectorizer, StringIndexer}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.RandomForestClassifier
val transformers = Array(
new StringIndexer().setInputCol("group").setOutputCol("label"),
new Tokenizer().setInputCol("text").setOutputCol("tokens"),
new CountVectorizer().setInputCol("tokens").setOutputCol("features")
)
val rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
val model = new Pipeline().setStages(transformers :+ rf).fit(trainRawDf)
model.transform(trainRawDf)
如果模型仅支持二元分类(逻辑回归)并扩展 o.a.s.ml.classification.Classifier
您可以使用一对一策略:
import org.apache.spark.ml.classification.OneVsRest
import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")
val ovr = new OneVsRest().setClassifier(lr)
val ovrModel = new Pipeline().setStages(transformers :+ ovr).fit(trainRawDf)
MLLib
根据 official documentation 目前 (MLlib 1.6.0) 以下方法支持多类分类:
- 逻辑回归,
- 决策树,
- 随机森林,
- 朴素贝叶斯
至少一些示例使用了多类分类:
- Naive Bayes example - 3 类
- Logistic regression - 10 类 用于分类器,尽管示例数据中只有 2 个
忽略方法特定参数的通用框架与 MLlib 中的所有其他方法几乎相同。您必须预处理您的输入以创建包含代表 label
和 features
:
的列的任一数据框
root
|-- label: double (nullable = true)
|-- features: vector (nullable = true)
或RDD[LabeledPoint]
.
Spark 提供了范围广泛的有用工具,旨在促进这一过程,包括 Feature Extractors and Feature Transformers and pipelines。
您会在下面找到一个使用随机森林的相当简单的示例。
首先让我们导入所需的包并创建虚拟数据:
import sqlContext.implicits._
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
case class LabeledRecord(group: String, text: String)
val trainRaw = sc.parallelize(
LabeledRecord("foo", "foo v a y b foo") ::
LabeledRecord("bar", "x bar y bar v") ::
LabeledRecord("bar", "x a y bar z") ::
LabeledRecord("foobar", "foo v b bar z") ::
LabeledRecord("foo", "foo x") ::
LabeledRecord("foobar", "z y x foo a b bar v") ::
Nil
)
现在让我们定义所需的转换器和流程Dataset
:
// Tokenizer to process text fields
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
// HashingTF to convert tokens to the feature vector
val hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("features")
.setNumFeatures(10)
// Indexer to convert String labels to Double
val indexer = new StringIndexer()
.setInputCol("group")
.setOutputCol("label")
.fit(trainRaw.toDF)
def transfom(rdd: RDD[LabeledRecord]) = {
val tokenized = tokenizer.transform(rdd.toDF)
val hashed = hashingTF.transform(tokenized)
val indexed = indexer.transform(hashed)
indexed
.select($"label", $"features")
.map{case Row(label: Double, features: Vector) =>
LabeledPoint(label, features)}
}
val train: RDD[LabeledPoint] = transfom(trainRaw)
请注意indexer
在火车数据上是"fitted"。它只是意味着将用作标签的分类值转换为 doubles
。要在新数据上使用分类器,您必须先使用此 indexer
.
对其进行转换
接下来我们可以训练射频模型:
val numClasses = 3
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 10
val featureSubsetStrategy = "auto"
val impurity = "gini"
val maxDepth = 4
val maxBins = 16
val model = RandomForest.trainClassifier(
train, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity,
maxDepth, maxBins
)
最后测试一下:
val testRaw = sc.parallelize(
LabeledRecord("foo", "foo foo z z z") ::
LabeledRecord("bar", "z bar y y v") ::
LabeledRecord("bar", "a a bar a z") ::
LabeledRecord("foobar", "foo v b bar z") ::
LabeledRecord("foobar", "a foo a bar") ::
Nil
)
val test: RDD[LabeledPoint] = transfom(testRaw)
val predsAndLabs = test.map(lp => (model.predict(lp.features), lp.label))
val metrics = new MulticlassMetrics(predsAndLabs)
metrics.precision
metrics.recall
您使用的是 Spark 1.6 而不是 Spark 2.1?
我认为问题在于,在 spark 2.1 中,转换方法 return 是一个数据集,它可以隐式转换为类型化 RDD,而在此之前,它 return 是一个数据框或行。
尝试将转换函数的 return 类型指定为 RDD[LabeledPoint] 作为诊断,看看是否会出现相同的错误。
你们知道我在哪里可以找到 Spark 中多类分类的示例吗?我花了很多时间在书籍和网络上搜索,到目前为止我只知道这是可能的,因为根据文档是最新版本。
ML
(Spark 2.0+推荐)
我们将使用与下面 MLlib 中相同的数据。有两个基本选项。如果Estimator
支持开箱即用的多类分类(例如随机森林),您可以直接使用它:
val trainRawDf = trainRaw.toDF
import org.apache.spark.ml.feature.{Tokenizer, CountVectorizer, StringIndexer}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.RandomForestClassifier
val transformers = Array(
new StringIndexer().setInputCol("group").setOutputCol("label"),
new Tokenizer().setInputCol("text").setOutputCol("tokens"),
new CountVectorizer().setInputCol("tokens").setOutputCol("features")
)
val rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
val model = new Pipeline().setStages(transformers :+ rf).fit(trainRawDf)
model.transform(trainRawDf)
如果模型仅支持二元分类(逻辑回归)并扩展 o.a.s.ml.classification.Classifier
您可以使用一对一策略:
import org.apache.spark.ml.classification.OneVsRest
import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")
val ovr = new OneVsRest().setClassifier(lr)
val ovrModel = new Pipeline().setStages(transformers :+ ovr).fit(trainRawDf)
MLLib
根据 official documentation 目前 (MLlib 1.6.0) 以下方法支持多类分类:
- 逻辑回归,
- 决策树,
- 随机森林,
- 朴素贝叶斯
至少一些示例使用了多类分类:
- Naive Bayes example - 3 类
- Logistic regression - 10 类 用于分类器,尽管示例数据中只有 2 个
忽略方法特定参数的通用框架与 MLlib 中的所有其他方法几乎相同。您必须预处理您的输入以创建包含代表 label
和 features
:
root
|-- label: double (nullable = true)
|-- features: vector (nullable = true)
或RDD[LabeledPoint]
.
Spark 提供了范围广泛的有用工具,旨在促进这一过程,包括 Feature Extractors and Feature Transformers and pipelines。
您会在下面找到一个使用随机森林的相当简单的示例。
首先让我们导入所需的包并创建虚拟数据:
import sqlContext.implicits._
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
case class LabeledRecord(group: String, text: String)
val trainRaw = sc.parallelize(
LabeledRecord("foo", "foo v a y b foo") ::
LabeledRecord("bar", "x bar y bar v") ::
LabeledRecord("bar", "x a y bar z") ::
LabeledRecord("foobar", "foo v b bar z") ::
LabeledRecord("foo", "foo x") ::
LabeledRecord("foobar", "z y x foo a b bar v") ::
Nil
)
现在让我们定义所需的转换器和流程Dataset
:
// Tokenizer to process text fields
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
// HashingTF to convert tokens to the feature vector
val hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("features")
.setNumFeatures(10)
// Indexer to convert String labels to Double
val indexer = new StringIndexer()
.setInputCol("group")
.setOutputCol("label")
.fit(trainRaw.toDF)
def transfom(rdd: RDD[LabeledRecord]) = {
val tokenized = tokenizer.transform(rdd.toDF)
val hashed = hashingTF.transform(tokenized)
val indexed = indexer.transform(hashed)
indexed
.select($"label", $"features")
.map{case Row(label: Double, features: Vector) =>
LabeledPoint(label, features)}
}
val train: RDD[LabeledPoint] = transfom(trainRaw)
请注意indexer
在火车数据上是"fitted"。它只是意味着将用作标签的分类值转换为 doubles
。要在新数据上使用分类器,您必须先使用此 indexer
.
接下来我们可以训练射频模型:
val numClasses = 3
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 10
val featureSubsetStrategy = "auto"
val impurity = "gini"
val maxDepth = 4
val maxBins = 16
val model = RandomForest.trainClassifier(
train, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity,
maxDepth, maxBins
)
最后测试一下:
val testRaw = sc.parallelize(
LabeledRecord("foo", "foo foo z z z") ::
LabeledRecord("bar", "z bar y y v") ::
LabeledRecord("bar", "a a bar a z") ::
LabeledRecord("foobar", "foo v b bar z") ::
LabeledRecord("foobar", "a foo a bar") ::
Nil
)
val test: RDD[LabeledPoint] = transfom(testRaw)
val predsAndLabs = test.map(lp => (model.predict(lp.features), lp.label))
val metrics = new MulticlassMetrics(predsAndLabs)
metrics.precision
metrics.recall
您使用的是 Spark 1.6 而不是 Spark 2.1? 我认为问题在于,在 spark 2.1 中,转换方法 return 是一个数据集,它可以隐式转换为类型化 RDD,而在此之前,它 return 是一个数据框或行。
尝试将转换函数的 return 类型指定为 RDD[LabeledPoint] 作为诊断,看看是否会出现相同的错误。