如何在 mllib 中准备训练数据
How to prepare for training data in mllib
TL;DR;
我如何使用 mllib
训练我的 wiki 数据(文本和类别)以针对推文进行预测?
我无法弄清楚如何转换我的标记化 wiki 数据,以便可以通过 NaiveBayes
或 LogisticRegression
对其进行训练。我的目标是使用经过训练的模型与推文*进行比较。我已经尝试将管道与 LR 和 HashingTF
与 IDF
一起用于 NaiveBayes
,但我总是得到错误的预测。这是我尝试过的:
*请注意,我想将 wiki 数据中的许多类别用于我的标签...我只见过二进制分类(它是一个类别或另一个类别)...是否可以做什么我要?
带 LR 的管道
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.feature.RegexTokenizer
case class WikiData(category: String, text: String)
case class LabeledData(category: String, text: String, label: Double)
val wikiData = sc.parallelize(List(WikiData("Spark", "this is about spark"), WikiData("Hadoop","then there is hadoop")))
val categoryMap = wikiData.map(x=>x.category).distinct.zipWithIndex.mapValues(x=>x.toDouble/1000).collectAsMap
val labeledData = wikiData.map(x=>LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF
val tokenizer = new RegexTokenizer()
.setInputCol("text")
.setOutputCol("words")
.setPattern("/W+")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(labeledData)
model.transform(labeledData).show
朴素贝叶斯
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documentsAsWordSequenceAlready)
import org.apache.spark.mllib.feature.IDF
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
tf.cache()
val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
//to create tfidfLabeled (below) I ran a map set the labels...but again it seems to have to be 1.0 or 0.0?
NaiveBayes.train(tfidfLabeled)
.predict(hashingTF.transform(tweet))
.collect
ML LogisticRegression
尚不支持多项分类,但 MLLib NaiveBayes
和 LogisticRegressionWithLBFGS
都支持它。在第一种情况下,它应该默认工作:
import org.apache.spark.mllib.classification.NaiveBayes
val nbModel = new NaiveBayes()
.setModelType("multinomial") // This is default value
.run(train)
但对于逻辑回归,您应该提供一些 类:
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
val model = new LogisticRegressionWithLBFGS()
.setNumClasses(n) // Set number of classes
.run(trainingData)
关于预处理步骤,这是一个相当广泛的话题,如果不访问您的数据就很难给您一个有意义的建议,所以您在下面找到的所有内容都只是一个大胆的猜测:
- 据我了解,您使用 wiki 数据进行训练,使用推文进行测试。如果这是真的,那通常是个坏主意。您可以预料到,两组使用的词汇、语法和拼写明显不同
- 简单的正则表达式分词器可以在标准化文本上表现出色,但根据我的经验,它在推文等非正式文本上效果不佳
HashingTF
可能是获取基线模型的好方法,但它是一种极其简化的方法,尤其是在您不应用任何过滤步骤的情况下。如果您决定使用它,您至少应该增加功能数量或使用默认值 (2^20)
编辑(使用 IDF 为朴素贝叶斯准备数据)
- 使用 ML 管道:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.feature.IDF
import org.apache.spark.sql.Row
val tokenizer = ???
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("rawFeatures")
val idf = new IDF()
.setInputCol(hashingTF.getOutputCol)
.setOutputCol("features")
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf))
val model = pipeline.fit(labeledData)
model
.transform(labeledData)
.select($"label", $"features")
.map{case Row(label: Double, features: Vector) => LabeledPoint(label, features)}
- 使用 MLlib 转换器:
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.feature.{IDF, IDFModel}
val labeledData = wikiData.map(x =>
LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0)))
val p = "\W+".r
val raw = labeledData.map{
case LabeledData(_, text, label) => (label, p.split(text))}
val hashingTF: org.apache.spark.mllib.feature.HashingTF = new HashingTF(1000)
val tf = raw.map{case (label, text) => (label, hashingTF.transform(text))}
val idf: org.apache.spark.mllib.feature.IDFModel = new IDF().fit(tf.map(_._2))
tf.map{
case (label, rawFeatures) => LabeledPoint(label, idf.transform(rawFeatures))}
注意:由于转换器需要 JVM 访问权限,因此 MLlib 版本在 PySpark 中不起作用。如果您更喜欢 Python,则必须 .
编辑(为机器学习算法准备数据):
虽然下面的一段代码乍一看是有效的
val categoryMap = wikiData
.map(x=>x.category)
.distinct
.zipWithIndex
.mapValues(x=>x.toDouble/1000)
.collectAsMap
val labeledData = wikiData.map(x=>LabeledData(
x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF
它不会为 ML
算法生成有效标签。
首先,ML
期望标签在 (0.0, 1.0, ..., n.0) 中,其中 n 是 类 的数量。如果您的示例管道中 类 之一获得标签 0.001,您将收到如下错误:
ERROR LogisticRegression: Classification labels should be in {0 to 0 Found 1 invalid labels.
显而易见的解决方案是在生成映射时避免除法
.mapValues(x=>x.toDouble)
虽然它适用于 LogisticRegression
,但其他 ML
算法仍然会失败。例如 RandomForestClassifier
你会得到
RandomForestClassifier was given input with invalid label column label, without the number of classes specified. See StringIndexer.
RandomForestClassifier
的 ML 版本与它的 MLlib
版本不同,它没有提供设置 类 数量的方法。原来它期望在 DataFrame
列上设置特殊属性。最简单的方法是使用错误信息中提到的StringIndexer
:
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("label")
val pipeline = new Pipeline()
.setStages(Array(indexer, tokenizer, hashingTF, idf, lr))
val model = pipeline.fit(wikiData.toDF)
TL;DR;
我如何使用 mllib
训练我的 wiki 数据(文本和类别)以针对推文进行预测?
我无法弄清楚如何转换我的标记化 wiki 数据,以便可以通过 NaiveBayes
或 LogisticRegression
对其进行训练。我的目标是使用经过训练的模型与推文*进行比较。我已经尝试将管道与 LR 和 HashingTF
与 IDF
一起用于 NaiveBayes
,但我总是得到错误的预测。这是我尝试过的:
*请注意,我想将 wiki 数据中的许多类别用于我的标签...我只见过二进制分类(它是一个类别或另一个类别)...是否可以做什么我要?
带 LR 的管道
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.feature.RegexTokenizer
case class WikiData(category: String, text: String)
case class LabeledData(category: String, text: String, label: Double)
val wikiData = sc.parallelize(List(WikiData("Spark", "this is about spark"), WikiData("Hadoop","then there is hadoop")))
val categoryMap = wikiData.map(x=>x.category).distinct.zipWithIndex.mapValues(x=>x.toDouble/1000).collectAsMap
val labeledData = wikiData.map(x=>LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF
val tokenizer = new RegexTokenizer()
.setInputCol("text")
.setOutputCol("words")
.setPattern("/W+")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(labeledData)
model.transform(labeledData).show
朴素贝叶斯
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documentsAsWordSequenceAlready)
import org.apache.spark.mllib.feature.IDF
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
tf.cache()
val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
//to create tfidfLabeled (below) I ran a map set the labels...but again it seems to have to be 1.0 or 0.0?
NaiveBayes.train(tfidfLabeled)
.predict(hashingTF.transform(tweet))
.collect
ML LogisticRegression
尚不支持多项分类,但 MLLib NaiveBayes
和 LogisticRegressionWithLBFGS
都支持它。在第一种情况下,它应该默认工作:
import org.apache.spark.mllib.classification.NaiveBayes
val nbModel = new NaiveBayes()
.setModelType("multinomial") // This is default value
.run(train)
但对于逻辑回归,您应该提供一些 类:
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
val model = new LogisticRegressionWithLBFGS()
.setNumClasses(n) // Set number of classes
.run(trainingData)
关于预处理步骤,这是一个相当广泛的话题,如果不访问您的数据就很难给您一个有意义的建议,所以您在下面找到的所有内容都只是一个大胆的猜测:
- 据我了解,您使用 wiki 数据进行训练,使用推文进行测试。如果这是真的,那通常是个坏主意。您可以预料到,两组使用的词汇、语法和拼写明显不同
- 简单的正则表达式分词器可以在标准化文本上表现出色,但根据我的经验,它在推文等非正式文本上效果不佳
HashingTF
可能是获取基线模型的好方法,但它是一种极其简化的方法,尤其是在您不应用任何过滤步骤的情况下。如果您决定使用它,您至少应该增加功能数量或使用默认值 (2^20)
编辑(使用 IDF 为朴素贝叶斯准备数据)
- 使用 ML 管道:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.feature.IDF
import org.apache.spark.sql.Row
val tokenizer = ???
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("rawFeatures")
val idf = new IDF()
.setInputCol(hashingTF.getOutputCol)
.setOutputCol("features")
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf))
val model = pipeline.fit(labeledData)
model
.transform(labeledData)
.select($"label", $"features")
.map{case Row(label: Double, features: Vector) => LabeledPoint(label, features)}
- 使用 MLlib 转换器:
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.feature.{IDF, IDFModel}
val labeledData = wikiData.map(x =>
LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0)))
val p = "\W+".r
val raw = labeledData.map{
case LabeledData(_, text, label) => (label, p.split(text))}
val hashingTF: org.apache.spark.mllib.feature.HashingTF = new HashingTF(1000)
val tf = raw.map{case (label, text) => (label, hashingTF.transform(text))}
val idf: org.apache.spark.mllib.feature.IDFModel = new IDF().fit(tf.map(_._2))
tf.map{
case (label, rawFeatures) => LabeledPoint(label, idf.transform(rawFeatures))}
注意:由于转换器需要 JVM 访问权限,因此 MLlib 版本在 PySpark 中不起作用。如果您更喜欢 Python,则必须
编辑(为机器学习算法准备数据):
虽然下面的一段代码乍一看是有效的
val categoryMap = wikiData
.map(x=>x.category)
.distinct
.zipWithIndex
.mapValues(x=>x.toDouble/1000)
.collectAsMap
val labeledData = wikiData.map(x=>LabeledData(
x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF
它不会为 ML
算法生成有效标签。
首先,ML
期望标签在 (0.0, 1.0, ..., n.0) 中,其中 n 是 类 的数量。如果您的示例管道中 类 之一获得标签 0.001,您将收到如下错误:
ERROR LogisticRegression: Classification labels should be in {0 to 0 Found 1 invalid labels.
显而易见的解决方案是在生成映射时避免除法
.mapValues(x=>x.toDouble)
虽然它适用于 LogisticRegression
,但其他 ML
算法仍然会失败。例如 RandomForestClassifier
你会得到
RandomForestClassifier was given input with invalid label column label, without the number of classes specified. See StringIndexer.
RandomForestClassifier
的 ML 版本与它的 MLlib
版本不同,它没有提供设置 类 数量的方法。原来它期望在 DataFrame
列上设置特殊属性。最简单的方法是使用错误信息中提到的StringIndexer
:
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("label")
val pipeline = new Pipeline()
.setStages(Array(indexer, tokenizer, hashingTF, idf, lr))
val model = pipeline.fit(wikiData.toDF)