Spark:FlatMap 和 CountVectorizer 管道

Spark: FlatMap and CountVectorizer pipeline

我正在处理管道并尝试在将列值传递给 CountVectorizer 之前拆分列值。

为此,我制作了一个自定义 Transformer。

class FlatMapTransformer(override val uid: String)
  extends Transformer {
  /**
   * Param for input column name.
   * @group param
   */
  final val inputCol = new Param[String](this, "inputCol", "The input column")
  final def getInputCol: String = $(inputCol)

  /**
   * Param for output column name.
   * @group param
   */
  final val outputCol = new Param[String](this, "outputCol", "The output column")
  final def getOutputCol: String = $(outputCol)

  def setInputCol(value: String): this.type = set(inputCol, value)
  def setOutputCol(value: String): this.type = set(outputCol, value)

  def this() = this(Identifiable.randomUID("FlatMapTransformer"))

  private val flatMap: String => Seq[String] = { input: String =>
    input.split(",")
  }

  override def copy(extra: ParamMap): SplitString = defaultCopy(extra)

  override def transform(dataset: Dataset[_]): DataFrame = {
    val flatMapUdf = udf(flatMap)
    dataset.withColumn($(outputCol), explode(flatMapUdf(col($(inputCol)))))
  }

  override def transformSchema(schema: StructType): StructType = {
    val dataType = schema($(inputCol)).dataType
    require(
      dataType.isInstanceOf[StringType],
      s"Input column must be of type StringType but got ${dataType}")
    val inputFields = schema.fields
    require(
      !inputFields.exists(_.name == $(outputCol)),
      s"Output column ${$(outputCol)} already exists.")

    DataTypes.createStructType(
      Array(
        DataTypes.createStructField($(outputCol), DataTypes.StringType, false)))
  }
}

该代码似乎是合法的,但是当我尝试将其与其他操作链接时出现问题。这是我的管道:

val train = reader.readTrainingData()

val cat_features = getFeaturesByType(taskConfig, "categorical")
val num_features = getFeaturesByType(taskConfig, "numeric")
val cat_ohe_features = getFeaturesByType(taskConfig, "categorical", Some("ohe"))
val cat_features_string_index = cat_features.
  filter { feature: String => !cat_ohe_features.contains(feature) }

val catIndexer = cat_features_string_index.map {
  feature =>
    new StringIndexer()
      .setInputCol(feature)
      .setOutputCol(feature + "_index")
      .setHandleInvalid("keep")
}

    val flatMapper = cat_ohe_features.map {
      feature =>
        new FlatMapTransformer()
          .setInputCol(feature)
          .setOutputCol(feature + "_transformed")
    }

    val countVectorizer = cat_ohe_features.map {
      feature =>

        new CountVectorizer()
          .setInputCol(feature + "_transformed")
          .setOutputCol(feature + "_vectorized")
          .setVocabSize(10)
    }


// val countVectorizer = cat_ohe_features.map {
//   feature =>
//
//     val flatMapper = new FlatMapTransformer()
//       .setInputCol(feature)
//       .setOutputCol(feature + "_transformed")
// 
//     new CountVectorizer()
//       .setInputCol(flatMapper.getOutputCol)
//       .setOutputCol(feature + "_vectorized")
//       .setVocabSize(10)
// }

val cat_features_index = cat_features_string_index.map {
  (feature: String) => feature + "_index"
}

val count_vectorized_index = cat_ohe_features.map {
  (feature: String) => feature + "_vectorized"
}

val catFeatureAssembler = new VectorAssembler()
  .setInputCols(cat_features_index)
  .setOutputCol("cat_features")

val oheFeatureAssembler = new VectorAssembler()
  .setInputCols(count_vectorized_index)
  .setOutputCol("cat_ohe_features")

val numFeatureAssembler = new VectorAssembler()
  .setInputCols(num_features)
  .setOutputCol("num_features")

val featureAssembler = new VectorAssembler()
  .setInputCols(Array("cat_features", "num_features", "cat_ohe_features_vectorized"))
  .setOutputCol("features")

val pipelineStages = catIndexer ++ flatMapper ++ countVectorizer ++
  Array(
    catFeatureAssembler,
    oheFeatureAssembler,
    numFeatureAssembler,
    featureAssembler)

val pipeline = new Pipeline().setStages(pipelineStages)
pipeline.fit(dataset = train)

运行这段代码,我收到一个错误: java.lang.IllegalArgumentException: Field "my_ohe_field_trasformed" does not exist.

[info]  java.lang.IllegalArgumentException: Field "from_expdelv_areas_transformed" does not exist.

[info]  at org.apache.spark.sql.types.StructType$$anonfun$apply.apply(StructType.scala:266)
[info]  at org.apache.spark.sql.types.StructType$$anonfun$apply.apply(StructType.scala:266)

[info]  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)

[info]  at scala.collection.AbstractMap.getOrElse(Map.scala:59)

[info]  at org.apache.spark.sql.types.StructType.apply(StructType.scala:265)

[info]  at org.apache.spark.ml.util.SchemaUtils$.checkColumnTypes(SchemaUtils.scala:56)

[info]  at org.apache.spark.ml.feature.CountVectorizerParams$class.validateAndTransformSchema(CountVectorizer.scala:75)

[info]  at org.apache.spark.ml.feature.CountVectorizer.validateAndTransformSchema(CountVectorizer.scala:123)

[info]  at org.apache.spark.ml.feature.CountVectorizer.transformSchema(CountVectorizer.scala:188)

当我取消对 stringSplittercountVectorizer 的注释时,在我的 Transformer

中出现错误

java.lang.IllegalArgumentException: Field "my_ohe_field" does not exist.val dataType = schema($(inputCol)).dataType

调用pipeline.getStages的结果:

strIdx_3c2630a738f0

strIdx_0d76d55d4200

FlatMapTransformer_fd8595c2969c

FlatMapTransformer_2e9a7af0b0fa

cntVec_c2ef31f00181

cntVec_68a78eca06c9

vecAssembler_a81dd9f43d56

vecAssembler_b647d348f0a0

vecAssembler_b5065a22d5c8

vecAssembler_d9176b8bb593

我可能走错了路。任何评论表示赞赏。

您的 FlatMapTransformer #transform 不正确,您的 dropping/ignoring 所有其他列当您 select 仅在 outputCol

请将您的方法修改为-

 override def transform(dataset: Dataset[_]): DataFrame = {
     val flatMapUdf = udf(flatMap)
    dataset.withColumn($(outputCol), explode(flatMapUdf(col($(inputCol)))))
  }

此外,修改您的 transformSchema 以在检查其数据类型之前先检查输入列-

 override def transformSchema(schema: StructType): StructType = {
require(schema.names.contains($(inputCol)), "inputCOl is not there in the input dataframe")
//... rest as it is
}

根据评论更新 1

  1. 请修改 copy 方法(虽然这不是您面临异常的原因)-
override def copy(extra: ParamMap): FlatMapTransformer = defaultCopy(extra)
  1. 请注意,CountVectorizer 采用具有 ArrayType(StringType, true/false) 类型列的列,并且由于 FlatMapTransformer 输出列成为 CountVectorizer 的输入,您需要FlatMapTransformer 的输出列必须是 ArrayType(StringType, true/false)。我想,不是这样的,你今天的代码如下-
  override def transform(dataset: Dataset[_]): DataFrame = {
    val flatMapUdf = udf(flatMap)
    dataset.withColumn($(outputCol), explode(flatMapUdf(col($(inputCol)))))
  }

explode函数将array<string>转换为string,因此转换器的输出变为StringType。您可能想将此代码更改为-

  override def transform(dataset: Dataset[_]): DataFrame = {
    val flatMapUdf = udf(flatMap)
    dataset.withColumn($(outputCol), flatMapUdf(col($(inputCol))))
  }

  1. 修改transformSchema方法输出ArrayType(StringType)
 override def transformSchema(schema: StructType): StructType = {
      val dataType = schema($(inputCol)).dataType
      require(
        dataType.isInstanceOf[StringType],
        s"Input column must be of type StringType but got ${dataType}")
      val inputFields = schema.fields
      require(
        !inputFields.exists(_.name == $(outputCol)),
        s"Output column ${$(outputCol)} already exists.")

      schema.add($(outputCol), ArrayType(StringType))
    }
  1. 将向量汇编程序更改为这个-
val featureAssembler = new VectorAssembler()
      .setInputCols(Array("cat_features", "num_features", "cat_ohe_features"))
      .setOutputCol("features")

我尝试在虚拟数据帧上执行您的管道,效果很好。完整代码请参考this gist