将 IndexToString 应用于 Spark 中的特征向量
Applying IndexToString to features vector in Spark
上下文: 我有一个数据框,其中所有分类值都已使用 StringIndexer 编制索引。
val categoricalColumns = df.schema.collect { case StructField(name, StringType, nullable, meta) => name }
val categoryIndexers = categoricalColumns.map {
col => new StringIndexer().setInputCol(col).setOutputCol(s"${col}Indexed")
}
然后我使用 VectorAssembler 向量化所有特征列(包括索引分类列)。
val assembler = new VectorAssembler()
.setInputCols(dfIndexed.columns.diff(List("label") ++ categoricalColumns))
.setOutputCol("features")
在应用分类器和一些额外的步骤之后,我最终得到了一个包含标签、特征和预测的数据框。我想将我的特征向量扩展到单独的列,以便将索引值转换回其原始字符串形式。
val categoryConverters = categoricalColumns.zip(categoryIndexers).map {
colAndIndexer => new IndexToString().setInputCol(s"${colAndIndexer._1}Indexed").setOutputCol(colAndIndexer._1).setLabels(colAndIndexer._2.fit(df).labels)
}
问题:是否有简单的方法,或者以某种方式将预测列附加到测试的最佳方法数据框?
我试过的:
val featureSlicers = categoricalColumns.map {
col => new VectorSlicer().setInputCol("features").setOutputCol(s"${col}Indexed").setNames(Array(s"${col}Indexed"))
}
应用它可以得到我想要的列,但它们是矢量形式(正如它的本意),而不是键入 Double。
编辑:
所需的输出是原始数据框(即分类特征作为字符串而不是索引)和一个额外的列指示预测标签(在我的例子中是 0 或 1)。
例如,假设我的分类器的输出看起来像这样:
+-----+---------+----------+
|label| features|prediction|
+-----+---------+----------+
| 1.0|[0.0,3.0]| 1.0|
+-----+---------+----------+
通过在每个特征上应用 VectorSlicer,我会得到:
+-----+---------+----------+-------------+-------------+
|label| features|prediction|statusIndexed|artistIndexed|
+-----+---------+----------+-------------+-------------+
| 1.0|[0.0,3.0]| 1.0| [0.0]| [3.0]|
+-----+---------+----------+-------------+-------------+
很好,但我需要:
+-----+---------+----------+-------------+-------------+
|label| features|prediction|statusIndexed|artistIndexed|
+-----+---------+----------+-------------+-------------+
| 1.0|[0.0,3.0]| 1.0| 0.0 | 3.0 |
+-----+---------+----------+-------------+-------------+
然后能够使用 IndexToString 并将其转换为:
+-----+---------+----------+-------------+-------------+
|label| features|prediction| status | artist |
+-----+---------+----------+-------------+-------------+
| 1.0|[0.0,3.0]| 1.0| good | Pink Floyd |
+-----+---------+----------+-------------+-------------+
甚至:
+-----+----------+-------------+-------------+
|label|prediction| status | artist |
+-----+----------+-------------+-------------+
| 1.0| 1.0| good | Pink Floyd |
+-----+----------+-------------+-------------+
好吧,这不是一个非常有用的操作,但应该可以使用列元数据和简单的 UDF 提取所需的信息。我假设您的数据已经创建了一个类似于此的管道:
import org.apache.spark.ml.feature.{VectorSlicer, VectorAssembler, StringIndexer}
import org.apache.spark.ml.Pipeline
val df = sc.parallelize(Seq(
(1L, "a", "foo", 1.0), (2L, "b", "bar", 2.0), (3L, "a", "bar", 3.0)
)).toDF("id", "x1", "x2", "x3")
val featureCols = Array("x1", "x2", "x3")
val featureColsIdx = featureCols.map(c => s"${c}_i")
val indexers = featureCols.map(
c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_i")
)
val assembler = new VectorAssembler()
.setInputCols(featureColsIdx)
.setOutputCol("features")
val slicer = new VectorSlicer()
.setInputCol("features")
.setOutputCol("string_features")
.setNames(featureColsIdx.init)
val transformed = new Pipeline()
.setStages(indexers :+ assembler :+ slicer)
.fit(df)
.transform(df)
首先我们可以从特征中提取所需的元数据:
val meta = transformed.select($"string_features")
.schema.fields.head.metadata
.getMetadata("ml_attr")
.getMetadata("attrs")
.getMetadataArray("nominal")
并将其转换为更易于使用的形式
case class NominalMetadataWrapper(idx: Long, name: String, vals: Array[String])
// In general it could a good idea to make it a broadcast variable
val lookup = meta.map(m => NominalMetadataWrapper(
m.getLong("idx"), m.getString("name"), m.getStringArray("vals")
))
最后一个小UDF:
import scala.util.Try
val transFeatures = udf((v: Vector) => lookup.map{
m => Try(m.vals(v(m.idx.toInt).toInt)).toOption
})
transformed.select(transFeatures($"string_features")).
上下文: 我有一个数据框,其中所有分类值都已使用 StringIndexer 编制索引。
val categoricalColumns = df.schema.collect { case StructField(name, StringType, nullable, meta) => name }
val categoryIndexers = categoricalColumns.map {
col => new StringIndexer().setInputCol(col).setOutputCol(s"${col}Indexed")
}
然后我使用 VectorAssembler 向量化所有特征列(包括索引分类列)。
val assembler = new VectorAssembler()
.setInputCols(dfIndexed.columns.diff(List("label") ++ categoricalColumns))
.setOutputCol("features")
在应用分类器和一些额外的步骤之后,我最终得到了一个包含标签、特征和预测的数据框。我想将我的特征向量扩展到单独的列,以便将索引值转换回其原始字符串形式。
val categoryConverters = categoricalColumns.zip(categoryIndexers).map {
colAndIndexer => new IndexToString().setInputCol(s"${colAndIndexer._1}Indexed").setOutputCol(colAndIndexer._1).setLabels(colAndIndexer._2.fit(df).labels)
}
问题:是否有简单的方法,或者以某种方式将预测列附加到测试的最佳方法数据框?
我试过的:
val featureSlicers = categoricalColumns.map {
col => new VectorSlicer().setInputCol("features").setOutputCol(s"${col}Indexed").setNames(Array(s"${col}Indexed"))
}
应用它可以得到我想要的列,但它们是矢量形式(正如它的本意),而不是键入 Double。
编辑: 所需的输出是原始数据框(即分类特征作为字符串而不是索引)和一个额外的列指示预测标签(在我的例子中是 0 或 1)。
例如,假设我的分类器的输出看起来像这样:
+-----+---------+----------+
|label| features|prediction|
+-----+---------+----------+
| 1.0|[0.0,3.0]| 1.0|
+-----+---------+----------+
通过在每个特征上应用 VectorSlicer,我会得到:
+-----+---------+----------+-------------+-------------+
|label| features|prediction|statusIndexed|artistIndexed|
+-----+---------+----------+-------------+-------------+
| 1.0|[0.0,3.0]| 1.0| [0.0]| [3.0]|
+-----+---------+----------+-------------+-------------+
很好,但我需要:
+-----+---------+----------+-------------+-------------+
|label| features|prediction|statusIndexed|artistIndexed|
+-----+---------+----------+-------------+-------------+
| 1.0|[0.0,3.0]| 1.0| 0.0 | 3.0 |
+-----+---------+----------+-------------+-------------+
然后能够使用 IndexToString 并将其转换为:
+-----+---------+----------+-------------+-------------+
|label| features|prediction| status | artist |
+-----+---------+----------+-------------+-------------+
| 1.0|[0.0,3.0]| 1.0| good | Pink Floyd |
+-----+---------+----------+-------------+-------------+
甚至:
+-----+----------+-------------+-------------+
|label|prediction| status | artist |
+-----+----------+-------------+-------------+
| 1.0| 1.0| good | Pink Floyd |
+-----+----------+-------------+-------------+
好吧,这不是一个非常有用的操作,但应该可以使用列元数据和简单的 UDF 提取所需的信息。我假设您的数据已经创建了一个类似于此的管道:
import org.apache.spark.ml.feature.{VectorSlicer, VectorAssembler, StringIndexer}
import org.apache.spark.ml.Pipeline
val df = sc.parallelize(Seq(
(1L, "a", "foo", 1.0), (2L, "b", "bar", 2.0), (3L, "a", "bar", 3.0)
)).toDF("id", "x1", "x2", "x3")
val featureCols = Array("x1", "x2", "x3")
val featureColsIdx = featureCols.map(c => s"${c}_i")
val indexers = featureCols.map(
c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_i")
)
val assembler = new VectorAssembler()
.setInputCols(featureColsIdx)
.setOutputCol("features")
val slicer = new VectorSlicer()
.setInputCol("features")
.setOutputCol("string_features")
.setNames(featureColsIdx.init)
val transformed = new Pipeline()
.setStages(indexers :+ assembler :+ slicer)
.fit(df)
.transform(df)
首先我们可以从特征中提取所需的元数据:
val meta = transformed.select($"string_features")
.schema.fields.head.metadata
.getMetadata("ml_attr")
.getMetadata("attrs")
.getMetadataArray("nominal")
并将其转换为更易于使用的形式
case class NominalMetadataWrapper(idx: Long, name: String, vals: Array[String])
// In general it could a good idea to make it a broadcast variable
val lookup = meta.map(m => NominalMetadataWrapper(
m.getLong("idx"), m.getString("name"), m.getStringArray("vals")
))
最后一个小UDF:
import scala.util.Try
val transFeatures = udf((v: Vector) => lookup.map{
m => Try(m.vals(v(m.idx.toInt).toInt)).toOption
})
transformed.select(transFeatures($"string_features")).