在多个列上使用 Spark ML 的 OneHotEncoder
Using Spark ML's OneHotEncoder on multiple columns
我已经能够创建一个管道,允许我一次索引多个字符串列,但我在编码它们时遇到了困难,因为与索引不同,编码器不是估计器,所以我从不调用 fit
根据 OneHotEncoder example in the docs.
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler,
OneHotEncoder}
import org.apache.spark.ml.Pipeline
val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data")
val df = data.select("win","bid_price","domain","size", "form_factor").na.drop()
//indexing columns
val stringColumns = Array("domain","size", "form_factor")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(df)
val df_indexed = index_model.transform(df)
//encoding columns
val indexColumns = df_indexed.columns.filter(x => x contains "index")
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
cname => new OneHotEncoder()
.setInputCol(cname)
.setOutputCol(s"${cname}_vec")
)
val one_hot_pipeline = new Pipeline().setStages(one_hot_encoders)
val df_encoded = one_hot_pipeline.transform(df_indexed)
OneHotEncoder 对象没有 fit 方法,因此将它放在与索引器相同的管道中将不起作用 - 当我在管道上调用 fit 时它会抛出错误。我也无法在使用流水线阶段数组 one_hot_encoders
制作的流水线上调用转换。
我还没有找到一个很好的解决方案来使用 OneHotEncoder,而不是单独创建和调用转换自身来为我要编码的所有列进行转换
Spark >= 3.0:
在 Spark 3.0 中 OneHotEncoderEstimator
已重命名为 OneHotEncoder
:
import org.apache.spark.ml.feature.{OneHotEncoder, OneHotEncoderModel}
val encoder = new OneHotEncoder()
.setInputCols(indexColumns)
.setOutputCols(indexColumns map (name => s"${name}_vec"))
Spark >= 2.3
Spark 2.3 引入了新的 类 OneHotEncoderEstimator
、OneHotEncoderModel
,即使在 Pipeline
之外使用也需要拟合,并且同时对多个列进行操作。
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, OneHotEncoderModel}
val encoder = new OneHotEncoderEstimator()
.setInputCols(indexColumns)
.setOutputCols(indexColumns map (name => s"${name}_vec"))
encoder.fit(df_indexed).transform(df_indexed)
Spark < 2.3
即使您使用的转换器不需要拟合,您也必须使用 fit
方法创建可用于转换数据的 PipelineModel
。
one_hot_pipeline.fit(df_indexed).transform(df_indexed)
附带说明一下,您可以将索引和编码合并为一个 Pipeline
:
val pipeline = new Pipeline()
.setStages(index_transformers ++ one_hot_encoders)
val model = pipeline.fit(df)
model.transform(df)
编辑:
您看到的错误意味着您的其中一列包含空 String
。它被索引器接受但不能用于编码。根据您的要求,您可以删除这些或使用虚拟标签。很遗憾,在 SPARK-11569) 解决之前,您不能使用 NULLs
。
我已经能够创建一个管道,允许我一次索引多个字符串列,但我在编码它们时遇到了困难,因为与索引不同,编码器不是估计器,所以我从不调用 fit 根据 OneHotEncoder example in the docs.
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler,
OneHotEncoder}
import org.apache.spark.ml.Pipeline
val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data")
val df = data.select("win","bid_price","domain","size", "form_factor").na.drop()
//indexing columns
val stringColumns = Array("domain","size", "form_factor")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(df)
val df_indexed = index_model.transform(df)
//encoding columns
val indexColumns = df_indexed.columns.filter(x => x contains "index")
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
cname => new OneHotEncoder()
.setInputCol(cname)
.setOutputCol(s"${cname}_vec")
)
val one_hot_pipeline = new Pipeline().setStages(one_hot_encoders)
val df_encoded = one_hot_pipeline.transform(df_indexed)
OneHotEncoder 对象没有 fit 方法,因此将它放在与索引器相同的管道中将不起作用 - 当我在管道上调用 fit 时它会抛出错误。我也无法在使用流水线阶段数组 one_hot_encoders
制作的流水线上调用转换。
我还没有找到一个很好的解决方案来使用 OneHotEncoder,而不是单独创建和调用转换自身来为我要编码的所有列进行转换
Spark >= 3.0:
在 Spark 3.0 中 OneHotEncoderEstimator
已重命名为 OneHotEncoder
:
import org.apache.spark.ml.feature.{OneHotEncoder, OneHotEncoderModel}
val encoder = new OneHotEncoder()
.setInputCols(indexColumns)
.setOutputCols(indexColumns map (name => s"${name}_vec"))
Spark >= 2.3
Spark 2.3 引入了新的 类 OneHotEncoderEstimator
、OneHotEncoderModel
,即使在 Pipeline
之外使用也需要拟合,并且同时对多个列进行操作。
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, OneHotEncoderModel}
val encoder = new OneHotEncoderEstimator()
.setInputCols(indexColumns)
.setOutputCols(indexColumns map (name => s"${name}_vec"))
encoder.fit(df_indexed).transform(df_indexed)
Spark < 2.3
即使您使用的转换器不需要拟合,您也必须使用 fit
方法创建可用于转换数据的 PipelineModel
。
one_hot_pipeline.fit(df_indexed).transform(df_indexed)
附带说明一下,您可以将索引和编码合并为一个 Pipeline
:
val pipeline = new Pipeline()
.setStages(index_transformers ++ one_hot_encoders)
val model = pipeline.fit(df)
model.transform(df)
编辑:
您看到的错误意味着您的其中一列包含空 String
。它被索引器接受但不能用于编码。根据您的要求,您可以删除这些或使用虚拟标签。很遗憾,在 SPARK-11569) 解决之前,您不能使用 NULLs
。