如何在 MLlib 中编写自定义 Transformer?
How to write a custom Transformer in MLlib?
我想在 scala 的 spark 2.0 中为管道编写自定义 Transformer
。到目前为止,我还不是很清楚 copy
或 transformSchema
方法应该 return。他们 return 一个 null
是正确的吗? https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java 复制?
作为 Transformer
扩展 PipelineStage
我得出结论,fit
调用了 transformSchema
方法。我是否正确理解 transformSchema
类似于 sk-learns fit?
因为我的 Transformer
应该将数据集与(非常小的)第二个数据集连接起来,所以我也想将该数据集存储在序列化管道中。我应该如何将其存储在转换器中以正确使用管道序列化机制?
计算单个列的平均值并填充 nan 值 + 保留该值的简单转换器会是什么样子?
@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {
def transform(df: Dataset[MyClass]): DataFrame = {
}
override def copy(extra: ParamMap): Transformer = {
}
override def transformSchema(schema: StructType): StructType = {
schema
}
}
transformSchema
应该 return 应用 Transformer
后预期的架构。示例:
如果transfomer添加了IntegerType
的列,输出的列名是foo
:
import org.apache.spark.sql.types._
override def transformSchema(schema: StructType): StructType = {
schema.add(StructField("foo", IntegerType))
}
So if the schema is not changed for the dataset as only a name value is filled for mean imputation I should return the original case class as the schema?
在 Spark SQL(和 MLlib,也是)中是不可能的,因为 Dataset
一旦创建就 不可变 。您只能添加或 "replace"(添加后跟 drop
操作)列。
首先,我不确定您是否想要 Transformer
本身(或 UnaryTransformer
作为 ),如您所说:
How would a simple transformer look like which computes the mean for a single column and fills the nan values + persists this value?
对我来说,就好像你想应用一个聚合函数(又名聚合),"join" 它与所有列一起产生最终值或 NaN。
它 看起来 就像你想要一个 groupBy
为 mean
做聚合然后 join
这可能是一个 window 聚合也是。
无论如何,我将从 UnaryTransformer
开始,这将解决您问题中的第一个问题:
So far it is not really clear for me what the copy
or transformSchema
methods should return. Is it correct that they return a null?
参见 the complete project spark-mllib-custom-transformer at GitHub,其中我将 UnaryTransformer
实现为 toUpperCase
一个字符串列,对于 UnaryTransformer 如下所示:
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{DataType, StringType}
class UpperTransformer(override val uid: String)
extends UnaryTransformer[String, String, UpperTransformer] {
def this() = this(Identifiable.randomUID("upp"))
override protected def createTransformFunc: String => String = {
_.toUpperCase
}
override protected def outputDataType: DataType = StringType
}
我想在 scala 的 spark 2.0 中为管道编写自定义 Transformer
。到目前为止,我还不是很清楚 copy
或 transformSchema
方法应该 return。他们 return 一个 null
是正确的吗? https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java 复制?
作为 Transformer
扩展 PipelineStage
我得出结论,fit
调用了 transformSchema
方法。我是否正确理解 transformSchema
类似于 sk-learns fit?
因为我的 Transformer
应该将数据集与(非常小的)第二个数据集连接起来,所以我也想将该数据集存储在序列化管道中。我应该如何将其存储在转换器中以正确使用管道序列化机制?
计算单个列的平均值并填充 nan 值 + 保留该值的简单转换器会是什么样子?
@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist
class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer {
def transform(df: Dataset[MyClass]): DataFrame = {
}
override def copy(extra: ParamMap): Transformer = {
}
override def transformSchema(schema: StructType): StructType = {
schema
}
}
transformSchema
应该 return 应用 Transformer
后预期的架构。示例:
如果transfomer添加了
IntegerType
的列,输出的列名是foo
:import org.apache.spark.sql.types._ override def transformSchema(schema: StructType): StructType = { schema.add(StructField("foo", IntegerType)) }
So if the schema is not changed for the dataset as only a name value is filled for mean imputation I should return the original case class as the schema?
在 Spark SQL(和 MLlib,也是)中是不可能的,因为 Dataset
一旦创建就 不可变 。您只能添加或 "replace"(添加后跟 drop
操作)列。
首先,我不确定您是否想要 Transformer
本身(或 UnaryTransformer
作为
How would a simple transformer look like which computes the mean for a single column and fills the nan values + persists this value?
对我来说,就好像你想应用一个聚合函数(又名聚合),"join" 它与所有列一起产生最终值或 NaN。
它 看起来 就像你想要一个 groupBy
为 mean
做聚合然后 join
这可能是一个 window 聚合也是。
无论如何,我将从 UnaryTransformer
开始,这将解决您问题中的第一个问题:
So far it is not really clear for me what the
copy
ortransformSchema
methods should return. Is it correct that they return a null?
参见 the complete project spark-mllib-custom-transformer at GitHub,其中我将 UnaryTransformer
实现为 toUpperCase
一个字符串列,对于 UnaryTransformer 如下所示:
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{DataType, StringType}
class UpperTransformer(override val uid: String)
extends UnaryTransformer[String, String, UpperTransformer] {
def this() = this(Identifiable.randomUID("upp"))
override protected def createTransformFunc: String => String = {
_.toUpperCase
}
override protected def outputDataType: DataType = StringType
}