如何在 MLlib 中编写自定义 Transformer?

How to write a custom Transformer in MLlib?

我想在 scala 的 spark 2.0 中为管道编写自定义 Transformer。到目前为止,我还不是很清楚 copytransformSchema 方法应该 return。他们 return 一个 null 是正确的吗? https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java 复制?

作为 Transformer 扩展 PipelineStage 我得出结论,fit 调用了 transformSchema 方法。我是否正确理解 transformSchema 类似于 sk-learns fit?

因为我的 Transformer 应该将数据集与(非常小的)第二个数据集连接起来,所以我也想将该数据集存储在序列化管道中。我应该如何将其存储在转换器中以正确使用管道序列化机制?

计算单个列的平均值并填充 nan 值 + 保留该值的简单转换器会是什么样子?

@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
    class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {

      def transform(df: Dataset[MyClass]): DataFrame = {

      }

      override def copy(extra: ParamMap): Transformer = {
      }

      override def transformSchema(schema: StructType): StructType = {
        schema
      }
    }

transformSchema 应该 return 应用 Transformer 后预期的架构。示例:

  • 如果transfomer添加了IntegerType的列,输出的列名是foo:

    import org.apache.spark.sql.types._
    
    override def transformSchema(schema: StructType): StructType = {
       schema.add(StructField("foo", IntegerType))
    }
    

So if the schema is not changed for the dataset as only a name value is filled for mean imputation I should return the original case class as the schema?

在 Spark SQL(和 MLlib,也是)中是不可能的,因为 Dataset 一旦创建就 不可变 。您只能添加或 "replace"(添加后跟 drop 操作)列。

首先,我不确定您是否想要 Transformer 本身(或 UnaryTransformer 作为 ),如您所说:

How would a simple transformer look like which computes the mean for a single column and fills the nan values + persists this value?

对我来说,就好像你想应用一个聚合函数(又名聚合),"join" 它与所有列一起产生最终值或 NaN。

看起来 就像你想要一个 groupBymean 做聚合然后 join 这可能是一个 window 聚合也是。

无论如何,我将从 UnaryTransformer 开始,这将解决您问题中的第一个问题:

So far it is not really clear for me what the copy or transformSchema methods should return. Is it correct that they return a null?

参见 the complete project spark-mllib-custom-transformer at GitHub,其中我将 UnaryTransformer 实现为 toUpperCase 一个字符串列,对于 UnaryTransformer 如下所示:

import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{DataType, StringType}

class UpperTransformer(override val uid: String)
  extends UnaryTransformer[String, String, UpperTransformer] {

  def this() = this(Identifiable.randomUID("upp"))

  override protected def createTransformFunc: String => String = {
    _.toUpperCase
  }

  override protected def outputDataType: DataType = StringType
}