如何在 Spark ML 中为分类创建正确的数据框
How to create correct data frame for classification in Spark ML
我正在尝试 运行 使用 Spark ML api 进行随机森林分类,但我在创建正确的数据帧输入到管道时遇到了问题。
这是示例数据:
age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"
age 和 hours_per_week 是整数,而其他特征包括标签 salaryRange是分类的(字符串)
加载此 csv 文件(我们称之为 sample.csv)可以通过 Spark csv library 完成,如下所示:
val data = sqlContext.csvFile("/home/dusan/sample.csv")
默认情况下,所有列都作为字符串导入,因此我们需要将 "age" 和 "hours_per_week" 更改为 Int:
val toInt = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
只是为了检查架构现在的样子:
scala> dataFixed.printSchema
root
|-- age: integer (nullable = true)
|-- hours_per_week: integer (nullable = true)
|-- education: string (nullable = true)
|-- sex: string (nullable = true)
|-- salaryRange: string (nullable = true)
然后让我们设置交叉验证器和管道:
val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)
运行宁此行时出现错误:
val cmModel = cv.fit(dataFixed)
java.lang.IllegalArgumentException: 字段 "features" 不存在。
可以在 RandomForestClassifier 中设置标签列和特征列,但是我有 4 列作为预测变量(特征),而不仅仅是一个。
我应该如何组织我的数据框,以便它具有正确组织的标签和特征列?
为方便起见,这里是完整代码:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object SampleClassification {
def main(args: Array[String]): Unit = {
//set spark context
val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import com.databricks.spark.csv._
//load data by using databricks "Spark CSV Library"
val data = sqlContext.csvFile("/home/dusan/sample.csv")
//by default all columns are imported as string so we need to change "age" and "hours_per_week" to Int
val toInt = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)
// this fails with error
//java.lang.IllegalArgumentException: Field "features" does not exist.
val cmModel = cv.fit(dataFixed)
}
}
感谢帮助!
根据 mllib 上的 spark 文档 - 随机树,在我看来你应该定义你正在使用的特征图并且点应该是一个标记点。
这将告诉算法应将哪一列用作预测以及哪些列是特征。
https://spark.apache.org/docs/latest/mllib-decision-tree.html
您只需要确保您的数据框中有一个 "features"
列是 VectorUDF
类型,如下所示:
scala> val df2 = dataFixed.withColumnRenamed("age", "features")
df2: org.apache.spark.sql.DataFrame = [features: int, hours_per_week: int, education: string, sex: string, salaryRange: string]
scala> val cmModel = cv.fit(df2)
java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.mllib.linalg.VectorUDT@1eef but was actually IntegerType.
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema.apply(Pipeline.scala:164)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema.apply(Pipeline.scala:164)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164)
at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
EDIT1
基本上,您的数据框中需要有两个字段 "features" 用于特征向量,"label" 用于实例标签。实例必须是 Double
.
类型
要创建具有 Vector
类型的 "features" 字段,首先创建一个 udf
,如下所示:
val toVec4 = udf[Vector, Int, Int, String, String] { (a,b,c,d) =>
val e3 = c match {
case "hs-grad" => 0
case "bachelors" => 1
case "masters" => 2
}
val e4 = d match {case "male" => 0 case "female" => 1}
Vectors.dense(a, b, e3, e4)
}
现在还要对 "label" 字段进行编码,创建另一个 udf
,如下所示:
val encodeLabel = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )
现在我们使用这两个 udf
:
来转换原始数据帧
val df = dataFixed.withColumn(
"features",
toVec4(
dataFixed("age"),
dataFixed("hours_per_week"),
dataFixed("education"),
dataFixed("sex")
)
).withColumn("label", encodeLabel(dataFixed("salaryRange"))).select("features", "label")
请注意,数据框中可能存在额外的列/字段,但在这种情况下,我只选择了 features
和 label
:
scala> df.show()
+-------------------+-----+
| features|label|
+-------------------+-----+
|[38.0,40.0,0.0,0.0]| 0.0|
|[28.0,40.0,1.0,1.0]| 0.0|
|[52.0,45.0,0.0,0.0]| 1.0|
|[31.0,50.0,2.0,1.0]| 1.0|
|[42.0,40.0,1.0,0.0]| 1.0|
+-------------------+-----+
现在由您来为您的学习算法设置正确的参数以使其发挥作用。
从 Spark 1.4 开始,您可以使用 Transformer org.apache.spark.ml.feature.VectorAssembler。
只需提供您希望成为功能的列名即可。
val assembler = new VectorAssembler()
.setInputCols(Array("col1", "col2", "col3"))
.setOutputCol("features")
并将其添加到您的管道中。
我正在尝试 运行 使用 Spark ML api 进行随机森林分类,但我在创建正确的数据帧输入到管道时遇到了问题。
这是示例数据:
age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"
age 和 hours_per_week 是整数,而其他特征包括标签 salaryRange是分类的(字符串)
加载此 csv 文件(我们称之为 sample.csv)可以通过 Spark csv library 完成,如下所示:
val data = sqlContext.csvFile("/home/dusan/sample.csv")
默认情况下,所有列都作为字符串导入,因此我们需要将 "age" 和 "hours_per_week" 更改为 Int:
val toInt = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
只是为了检查架构现在的样子:
scala> dataFixed.printSchema
root
|-- age: integer (nullable = true)
|-- hours_per_week: integer (nullable = true)
|-- education: string (nullable = true)
|-- sex: string (nullable = true)
|-- salaryRange: string (nullable = true)
然后让我们设置交叉验证器和管道:
val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)
运行宁此行时出现错误:
val cmModel = cv.fit(dataFixed)
java.lang.IllegalArgumentException: 字段 "features" 不存在。
可以在 RandomForestClassifier 中设置标签列和特征列,但是我有 4 列作为预测变量(特征),而不仅仅是一个。
我应该如何组织我的数据框,以便它具有正确组织的标签和特征列?
为方便起见,这里是完整代码:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object SampleClassification {
def main(args: Array[String]): Unit = {
//set spark context
val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import com.databricks.spark.csv._
//load data by using databricks "Spark CSV Library"
val data = sqlContext.csvFile("/home/dusan/sample.csv")
//by default all columns are imported as string so we need to change "age" and "hours_per_week" to Int
val toInt = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)
// this fails with error
//java.lang.IllegalArgumentException: Field "features" does not exist.
val cmModel = cv.fit(dataFixed)
}
}
感谢帮助!
根据 mllib 上的 spark 文档 - 随机树,在我看来你应该定义你正在使用的特征图并且点应该是一个标记点。
这将告诉算法应将哪一列用作预测以及哪些列是特征。
https://spark.apache.org/docs/latest/mllib-decision-tree.html
您只需要确保您的数据框中有一个 "features"
列是 VectorUDF
类型,如下所示:
scala> val df2 = dataFixed.withColumnRenamed("age", "features")
df2: org.apache.spark.sql.DataFrame = [features: int, hours_per_week: int, education: string, sex: string, salaryRange: string]
scala> val cmModel = cv.fit(df2)
java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.mllib.linalg.VectorUDT@1eef but was actually IntegerType.
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema.apply(Pipeline.scala:164)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema.apply(Pipeline.scala:164)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164)
at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
EDIT1
基本上,您的数据框中需要有两个字段 "features" 用于特征向量,"label" 用于实例标签。实例必须是 Double
.
要创建具有 Vector
类型的 "features" 字段,首先创建一个 udf
,如下所示:
val toVec4 = udf[Vector, Int, Int, String, String] { (a,b,c,d) =>
val e3 = c match {
case "hs-grad" => 0
case "bachelors" => 1
case "masters" => 2
}
val e4 = d match {case "male" => 0 case "female" => 1}
Vectors.dense(a, b, e3, e4)
}
现在还要对 "label" 字段进行编码,创建另一个 udf
,如下所示:
val encodeLabel = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )
现在我们使用这两个 udf
:
val df = dataFixed.withColumn(
"features",
toVec4(
dataFixed("age"),
dataFixed("hours_per_week"),
dataFixed("education"),
dataFixed("sex")
)
).withColumn("label", encodeLabel(dataFixed("salaryRange"))).select("features", "label")
请注意,数据框中可能存在额外的列/字段,但在这种情况下,我只选择了 features
和 label
:
scala> df.show()
+-------------------+-----+
| features|label|
+-------------------+-----+
|[38.0,40.0,0.0,0.0]| 0.0|
|[28.0,40.0,1.0,1.0]| 0.0|
|[52.0,45.0,0.0,0.0]| 1.0|
|[31.0,50.0,2.0,1.0]| 1.0|
|[42.0,40.0,1.0,0.0]| 1.0|
+-------------------+-----+
现在由您来为您的学习算法设置正确的参数以使其发挥作用。
从 Spark 1.4 开始,您可以使用 Transformer org.apache.spark.ml.feature.VectorAssembler。 只需提供您希望成为功能的列名即可。
val assembler = new VectorAssembler()
.setInputCols(Array("col1", "col2", "col3"))
.setOutputCol("features")
并将其添加到您的管道中。