使用 Spark 2.1.1 对多个变量进行一次性编码
One-hot encoding multiple variables with Spark 2.1.1
我需要使用 Spark 2.1.1 并且有一个简单的 ML 用例,我在其中拟合逻辑回归以基于连续变量和分类变量执行分类。
我自动检测分类变量并在 ML 管道中为它们编制索引。然而,当我随后尝试对我的每个变量(下面代码中的 oneHotEncodersStages 值)应用单热编码时,它会在创建管道时导致错误:
Error:(48, 118) type mismatch; found : Array[java.io.Serializable]
required: Array[_ <: org.apache.spark.ml.PipelineStage] Note:
java.io.Serializable >: org.apache.spark.ml.PipelineStage, but class
Array is invariant in type T. You may wish to investigate a wildcard
type such as _ >: org.apache.spark.ml.PipelineStage
. (SLS 3.2.10)
val pipeline = new Pipeline().setStages(stringIndexerStages :+
oneHotEncodersStages :+ indexer :+ assembler :+ lr :+ indexToLabel)
我找不到解决此错误的方法...有什么提示吗?下面是一个最小的工作示例
import spark.implicits._
val df = Seq(
("automatic","Honda",200,"Cheap"),
("semi-automatic","Ford",240,"Expensive")
).toDF("cat_type","cat_brand","Speed","label")
def onlyFeatureCols(c: String): Boolean = !(c matches "id|label") // Function to select only feature columns (omit id and label)
def isCateg(c: String): Boolean = c.startsWith("cat")
def categNewCol(c: String): String = if (isCateg(c)) s"idx_${c}" else c
def isIdx(c: String): Boolean = c.startsWith("idx")
def oneHotNewCol(c: String): String = if (isIdx(c)) s"vec_${c}" else c
val featuresNames = df.columns
.filter(onlyFeatureCols)
.map(categNewCol)
val stringIndexerStages = df.columns.filter(isCateg)
.map(c => new StringIndexer()
.setInputCol(c)
.setOutputCol(categNewCol(c))
.fit(df.select(c))
)
val oneHotEncodersStages = df.columns.filter(isIdx)
.map(c => new OneHotEncoder()
.setInputCol(c)
.setOutputCol(oneHotNewCol(c)))
val indexer = new StringIndexer().setInputCol("label").setOutputCol("labels").fit(df)
val indexToLabel = new IndexToString().setInputCol("prediction").setOutputCol("predicted_label").setLabels(indexer.labels)
val assembler = new VectorAssembler().setInputCols(featuresNames).setOutputCol("features")
val lr = new LogisticRegression().setFeaturesCol("features").setLabelCol("labels")
val pipeline = new Pipeline().setStages(stringIndexerStages :+ oneHotEncodersStages ++ indexer :+ assembler :+ lr :+ indexToLabel)
stringIndexerStages 和 oneHotEncodersStages 是数组。 stringIndexerStages :+ oneHotEncodersStages - 创建新数组,其中第二个数组用作单个元素。使用“++”代替“:+”:
val pipeline = new Pipeline().setStages(stringIndexerStages ++ oneHotEncodersStages :+ indexer :+ assembler :+ lr :+ indexToLabel)
我需要使用 Spark 2.1.1 并且有一个简单的 ML 用例,我在其中拟合逻辑回归以基于连续变量和分类变量执行分类。
我自动检测分类变量并在 ML 管道中为它们编制索引。然而,当我随后尝试对我的每个变量(下面代码中的 oneHotEncodersStages 值)应用单热编码时,它会在创建管道时导致错误:
Error:(48, 118) type mismatch; found : Array[java.io.Serializable] required: Array[_ <: org.apache.spark.ml.PipelineStage] Note: java.io.Serializable >: org.apache.spark.ml.PipelineStage, but class Array is invariant in type T. You may wish to investigate a wildcard type such as
_ >: org.apache.spark.ml.PipelineStage
. (SLS 3.2.10)
val pipeline = new Pipeline().setStages(stringIndexerStages :+ oneHotEncodersStages :+ indexer :+ assembler :+ lr :+ indexToLabel)
我找不到解决此错误的方法...有什么提示吗?下面是一个最小的工作示例
import spark.implicits._
val df = Seq(
("automatic","Honda",200,"Cheap"),
("semi-automatic","Ford",240,"Expensive")
).toDF("cat_type","cat_brand","Speed","label")
def onlyFeatureCols(c: String): Boolean = !(c matches "id|label") // Function to select only feature columns (omit id and label)
def isCateg(c: String): Boolean = c.startsWith("cat")
def categNewCol(c: String): String = if (isCateg(c)) s"idx_${c}" else c
def isIdx(c: String): Boolean = c.startsWith("idx")
def oneHotNewCol(c: String): String = if (isIdx(c)) s"vec_${c}" else c
val featuresNames = df.columns
.filter(onlyFeatureCols)
.map(categNewCol)
val stringIndexerStages = df.columns.filter(isCateg)
.map(c => new StringIndexer()
.setInputCol(c)
.setOutputCol(categNewCol(c))
.fit(df.select(c))
)
val oneHotEncodersStages = df.columns.filter(isIdx)
.map(c => new OneHotEncoder()
.setInputCol(c)
.setOutputCol(oneHotNewCol(c)))
val indexer = new StringIndexer().setInputCol("label").setOutputCol("labels").fit(df)
val indexToLabel = new IndexToString().setInputCol("prediction").setOutputCol("predicted_label").setLabels(indexer.labels)
val assembler = new VectorAssembler().setInputCols(featuresNames).setOutputCol("features")
val lr = new LogisticRegression().setFeaturesCol("features").setLabelCol("labels")
val pipeline = new Pipeline().setStages(stringIndexerStages :+ oneHotEncodersStages ++ indexer :+ assembler :+ lr :+ indexToLabel)
stringIndexerStages 和 oneHotEncodersStages 是数组。 stringIndexerStages :+ oneHotEncodersStages - 创建新数组,其中第二个数组用作单个元素。使用“++”代替“:+”:
val pipeline = new Pipeline().setStages(stringIndexerStages ++ oneHotEncodersStages :+ indexer :+ assembler :+ lr :+ indexToLabel)