Spark: OneHot encoder and storing Pipeline (特征维度问题)
Spark: OneHot encoder and storing Pipeline (feature dimension issue)
我们有一个由多个特征转换阶段组成的管道 (2.0.1)。
其中一些阶段是 OneHot 编码器。思路:将一个整数类分类成n个独立的特征
训练管道模型并使用它进行预测时一切正常。但是,存储经过训练的管道模型并重新加载它会导致问题:
存储的 'trained' OneHot 编码器不会跟踪有多少类别。现在加载它会导致问题:当加载的模型用于预测时,它会重新确定有多少类别,导致训练特征 space 和预测特征 space 具有不同的大小(维度)。在 Zeppelin 笔记本中查看下面的示例代码 运行:
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.PipelineModel
// Specifying two test samples, one with class 5 and one with class 3. This is OneHot encoded into 5 boolean features (sparse vector)
// Adding a 'filler' column because createDataFrame doesnt like single-column sequences and this is the easiest way to demo it ;)
val df = spark.createDataFrame(Seq((5, 1), (3, 1))).toDF("class", "filler")
val enc = new OneHotEncoder()
.setInputCol("class")
.setOutputCol("class_one_hot")
val pipeline = new Pipeline()
.setStages(Array(enc))
val model = pipeline.fit(df)
model.transform(df).show()
/*
+-----+------+-------------+
|class|filler|class_one_hot|
+-----+------+-------------+
| 5| 1|(5,[],[]) |
| 3| 1|(5,[3],[1.0])|
+-----+------+-------------+
Note: Vector of size 5
*/
model.write.overwrite().save("s3a://one-hot")
val loadedModel = PipelineModel.load("s3a://one-hot")
val df2 = spark.createDataFrame(Seq((3, 1))).toDF("class", "output") // When using the trained model our input consists of one row (prediction engine style). The provided category for the prediction feature set is category 3
loadedModel.transform(df2).show()
/*
+-----+------+-------------+
|class|output|class_one_hot|
+-----+------+-------------+
| 3| 1|(3,[],[]) |
+-----+------+-------------+
Note: Incompatible vector of size 3
*/
我不想制作自己的支持此序列化的 OneHot 编码器,是否有任何开箱即用的替代方案?
火花 >= 2.3
Spark 2.3引入了OneHotEncoderEstimator
(Spark 3.0改名为OneHotEncoder
)可以直接使用,支持多列输入
Spark < 2.3
您没有使用 OneHotEncoder
,因为它本来就是要被使用的。 OneHotEncoder
是 Transofrmer
而不是 Estimator
。它不存储有关级别的任何信息,而是依赖于 Column
元数据来确定输出维度。如果缺少元数据,就像您的情况一样,它会使用后备策略并假定存在 max(input_column)
级别。序列化在这里无关紧要。
典型用法涉及上游 Pipeline
中的 Transformers
,它为您设置元数据。一个常见的例子是 StringIndexer
.
手动设置元数据还是可以的,但是比较麻烦:
import org.apache.spark.ml.attribute.NominalAttribute
val meta = NominalAttribute.defaultAttr
.withName("class")
.withValues("0", (1 to 5).map(_.toString): _*)
.toMetadata
loadedModel.transform(df2.select($"class".as("class", meta), $"output"))
与 Python 类似(需要 Spark >= 2.2):
from pyspark.sql.functions import col
meta = {"ml_attr": {
"vals": [str(x) for x in range(6)], # Provide a set of levels
"type": "nominal",
"name": "class"}}
loaded.transform(
df.withColumn("class", col("class").alias("class", metadata=meta))
)
也可以使用多种不同的方法附加元数据:。
我们有一个由多个特征转换阶段组成的管道 (2.0.1)。
其中一些阶段是 OneHot 编码器。思路:将一个整数类分类成n个独立的特征
训练管道模型并使用它进行预测时一切正常。但是,存储经过训练的管道模型并重新加载它会导致问题:
存储的 'trained' OneHot 编码器不会跟踪有多少类别。现在加载它会导致问题:当加载的模型用于预测时,它会重新确定有多少类别,导致训练特征 space 和预测特征 space 具有不同的大小(维度)。在 Zeppelin 笔记本中查看下面的示例代码 运行:
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.PipelineModel
// Specifying two test samples, one with class 5 and one with class 3. This is OneHot encoded into 5 boolean features (sparse vector)
// Adding a 'filler' column because createDataFrame doesnt like single-column sequences and this is the easiest way to demo it ;)
val df = spark.createDataFrame(Seq((5, 1), (3, 1))).toDF("class", "filler")
val enc = new OneHotEncoder()
.setInputCol("class")
.setOutputCol("class_one_hot")
val pipeline = new Pipeline()
.setStages(Array(enc))
val model = pipeline.fit(df)
model.transform(df).show()
/*
+-----+------+-------------+
|class|filler|class_one_hot|
+-----+------+-------------+
| 5| 1|(5,[],[]) |
| 3| 1|(5,[3],[1.0])|
+-----+------+-------------+
Note: Vector of size 5
*/
model.write.overwrite().save("s3a://one-hot")
val loadedModel = PipelineModel.load("s3a://one-hot")
val df2 = spark.createDataFrame(Seq((3, 1))).toDF("class", "output") // When using the trained model our input consists of one row (prediction engine style). The provided category for the prediction feature set is category 3
loadedModel.transform(df2).show()
/*
+-----+------+-------------+
|class|output|class_one_hot|
+-----+------+-------------+
| 3| 1|(3,[],[]) |
+-----+------+-------------+
Note: Incompatible vector of size 3
*/
我不想制作自己的支持此序列化的 OneHot 编码器,是否有任何开箱即用的替代方案?
火花 >= 2.3
Spark 2.3引入了OneHotEncoderEstimator
(Spark 3.0改名为OneHotEncoder
)可以直接使用,支持多列输入
Spark < 2.3
您没有使用 OneHotEncoder
,因为它本来就是要被使用的。 OneHotEncoder
是 Transofrmer
而不是 Estimator
。它不存储有关级别的任何信息,而是依赖于 Column
元数据来确定输出维度。如果缺少元数据,就像您的情况一样,它会使用后备策略并假定存在 max(input_column)
级别。序列化在这里无关紧要。
典型用法涉及上游 Pipeline
中的 Transformers
,它为您设置元数据。一个常见的例子是 StringIndexer
.
手动设置元数据还是可以的,但是比较麻烦:
import org.apache.spark.ml.attribute.NominalAttribute
val meta = NominalAttribute.defaultAttr
.withName("class")
.withValues("0", (1 to 5).map(_.toString): _*)
.toMetadata
loadedModel.transform(df2.select($"class".as("class", meta), $"output"))
与 Python 类似(需要 Spark >= 2.2):
from pyspark.sql.functions import col
meta = {"ml_attr": {
"vals": [str(x) for x in range(6)], # Provide a set of levels
"type": "nominal",
"name": "class"}}
loaded.transform(
df.withColumn("class", col("class").alias("class", metadata=meta))
)
也可以使用多种不同的方法附加元数据: