如何从 UDF 创建自定义转换器?
How to create a custom Transformer from a UDF?
我试图创建并保存带有自定义阶段的 Pipeline。我需要使用 UDF
将 column
添加到我的 DataFrame
。因此,我想知道是否可以将 UDF
或类似的操作转换为 Transformer
?
我的自定义 UDF
看起来像这样,我想学习如何使用 UDF
作为自定义 Transformer
。
def getFeatures(n: String) = {
val NUMBER_FEATURES = 4
val name = n.split(" +")(0).toLowerCase
((1 to NUMBER_FEATURES)
.filter(size => size <= name.length)
.map(size => name.substring(name.length - size)))
}
val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))
它不是一个功能齐全的解决方案,但您可以从这样的事情开始:
import org.apache.spark.ml.{UnaryTransformer}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}
class NGramTokenizer(override val uid: String)
extends UnaryTransformer[String, Seq[String], NGramTokenizer] {
def this() = this(Identifiable.randomUID("ngramtokenizer"))
override protected def createTransformFunc: String => Seq[String] = {
getFeatures _
}
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == StringType)
}
override protected def outputDataType: DataType = {
new ArrayType(StringType, true)
}
}
快速检查:
val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v")
val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs")
transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
您甚至可以尝试将其概括为如下内容:
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import scala.reflect.runtime.universe._
class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
override val uid: String,
f: T => U
) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]] {
override protected def createTransformFunc: T => U = f
override protected def validateInputType(inputType: DataType): Unit =
require(inputType == schemaFor[T].dataType)
override protected def outputDataType: DataType = schemaFor[U].dataType
}
val transformer = new UnaryUDFTransformer("featurize", getFeatures)
.setInputCol("v")
.setOutputCol("vs")
如果您想使用 UDF 而不是包装函数,您必须直接扩展 Transformer
并覆盖 transform
方法。不幸的是,大多数有用的 类 都是私有的,因此它可能相当棘手。
或者您可以注册 UDF:
spark.udf.register("getFeatures", getFeatures _)
并使用SQLTransformer
import org.apache.spark.ml.feature.SQLTransformer
val transformer = new SQLTransformer()
.setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__")
transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
我最初尝试扩展 Transformer
和 UnaryTransformer
摘要,但遇到了我的应用程序无法达到 DefaultParamsWriteable
的问题。作为一个可能与您的问题相关的示例,我根据 this example 创建了一个简单的术语规范器作为 UDF。我的目标是将术语与模式和集合相匹配,以用通用术语替换它们。例如:
"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b".r -> "emailaddr"
这是class
import scala.util.matching.Regex
class TermNormalizer(normMap: Map[Any, String]) {
val normalizationMap = normMap
def normalizeTerms(terms: Seq[String]): Seq[String] = {
var termsUpdated = terms
for ((term, idx) <- termsUpdated.view.zipWithIndex) {
for (normalizer <- normalizationMap.keys: Iterable[Any]) {
normalizer match {
case (regex: Regex) =>
if (!regex.findFirstIn(term).isEmpty) termsUpdated =
termsUpdated.updated(idx, normalizationMap(regex))
case (set: Set[String]) =>
if (set.contains(term)) termsUpdated =
termsUpdated.updated(idx, normalizationMap(set))
}
}
}
termsUpdated
}
}
我是这样使用的:
val testMap: Map[Any, String] = Map("hadoop".r -> "elephant",
"spark".r -> "sparky", "cool".r -> "neat",
Set("123", "456") -> "set1",
Set("789", "10") -> "set2")
val testTermNormalizer = new TermNormalizer(testMap)
val termNormalizerUdf = udf(testTermNormalizer.normalizeTerms(_: Seq[String]))
val trainingTest = sqlContext.createDataFrame(Seq(
(0L, "spark is cool 123", 1.0),
(1L, "adsjkfadfk akjdsfhad 456", 0.0),
(2L, "spark rocks my socks 789 10", 1.0),
(3L, "hadoop is cool 10", 0.0)
)).toDF("id", "text", "label")
val testTokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val tokenizedTrainingTest = testTokenizer.transform(trainingTest)
println(tokenizedTrainingTest
.select($"id", $"text", $"words", termNormalizerUdf($"words"), $"label").show(false))
现在我更仔细地阅读了这个问题,听起来你是在问如何避免这样做,哈哈。无论如何,我仍然会 post 它以防将来有人正在寻找一种简单的方法来应用类似 transformer 的功能
如果您也希望使转换器可写,那么您可以在您选择的 public 包中的 sharedParams 库中重新实现 HasInputCol 等特性,然后将它们与 DefaultParamsWritable 特性一起使用使转换器持久化。
这样您还可以避免将部分代码放在 spark core ml 包中,但您可以在自己的包中维护一组平行的参数。这不是真正的问题,因为它们几乎从不改变。
但是请跟踪他们的 JIRA 板 here 中的错误,该错误要求制作一些通用的共享参数 public 而不是 ml 私有的,以便人们可以直接从外部使用它们类.
我试图创建并保存带有自定义阶段的 Pipeline。我需要使用 UDF
将 column
添加到我的 DataFrame
。因此,我想知道是否可以将 UDF
或类似的操作转换为 Transformer
?
我的自定义 UDF
看起来像这样,我想学习如何使用 UDF
作为自定义 Transformer
。
def getFeatures(n: String) = {
val NUMBER_FEATURES = 4
val name = n.split(" +")(0).toLowerCase
((1 to NUMBER_FEATURES)
.filter(size => size <= name.length)
.map(size => name.substring(name.length - size)))
}
val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))
它不是一个功能齐全的解决方案,但您可以从这样的事情开始:
import org.apache.spark.ml.{UnaryTransformer}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}
class NGramTokenizer(override val uid: String)
extends UnaryTransformer[String, Seq[String], NGramTokenizer] {
def this() = this(Identifiable.randomUID("ngramtokenizer"))
override protected def createTransformFunc: String => Seq[String] = {
getFeatures _
}
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == StringType)
}
override protected def outputDataType: DataType = {
new ArrayType(StringType, true)
}
}
快速检查:
val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v")
val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs")
transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
您甚至可以尝试将其概括为如下内容:
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import scala.reflect.runtime.universe._
class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
override val uid: String,
f: T => U
) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]] {
override protected def createTransformFunc: T => U = f
override protected def validateInputType(inputType: DataType): Unit =
require(inputType == schemaFor[T].dataType)
override protected def outputDataType: DataType = schemaFor[U].dataType
}
val transformer = new UnaryUDFTransformer("featurize", getFeatures)
.setInputCol("v")
.setOutputCol("vs")
如果您想使用 UDF 而不是包装函数,您必须直接扩展 Transformer
并覆盖 transform
方法。不幸的是,大多数有用的 类 都是私有的,因此它可能相当棘手。
或者您可以注册 UDF:
spark.udf.register("getFeatures", getFeatures _)
并使用SQLTransformer
import org.apache.spark.ml.feature.SQLTransformer
val transformer = new SQLTransformer()
.setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__")
transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
我最初尝试扩展 Transformer
和 UnaryTransformer
摘要,但遇到了我的应用程序无法达到 DefaultParamsWriteable
的问题。作为一个可能与您的问题相关的示例,我根据 this example 创建了一个简单的术语规范器作为 UDF。我的目标是将术语与模式和集合相匹配,以用通用术语替换它们。例如:
"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b".r -> "emailaddr"
这是class
import scala.util.matching.Regex
class TermNormalizer(normMap: Map[Any, String]) {
val normalizationMap = normMap
def normalizeTerms(terms: Seq[String]): Seq[String] = {
var termsUpdated = terms
for ((term, idx) <- termsUpdated.view.zipWithIndex) {
for (normalizer <- normalizationMap.keys: Iterable[Any]) {
normalizer match {
case (regex: Regex) =>
if (!regex.findFirstIn(term).isEmpty) termsUpdated =
termsUpdated.updated(idx, normalizationMap(regex))
case (set: Set[String]) =>
if (set.contains(term)) termsUpdated =
termsUpdated.updated(idx, normalizationMap(set))
}
}
}
termsUpdated
}
}
我是这样使用的:
val testMap: Map[Any, String] = Map("hadoop".r -> "elephant",
"spark".r -> "sparky", "cool".r -> "neat",
Set("123", "456") -> "set1",
Set("789", "10") -> "set2")
val testTermNormalizer = new TermNormalizer(testMap)
val termNormalizerUdf = udf(testTermNormalizer.normalizeTerms(_: Seq[String]))
val trainingTest = sqlContext.createDataFrame(Seq(
(0L, "spark is cool 123", 1.0),
(1L, "adsjkfadfk akjdsfhad 456", 0.0),
(2L, "spark rocks my socks 789 10", 1.0),
(3L, "hadoop is cool 10", 0.0)
)).toDF("id", "text", "label")
val testTokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val tokenizedTrainingTest = testTokenizer.transform(trainingTest)
println(tokenizedTrainingTest
.select($"id", $"text", $"words", termNormalizerUdf($"words"), $"label").show(false))
现在我更仔细地阅读了这个问题,听起来你是在问如何避免这样做,哈哈。无论如何,我仍然会 post 它以防将来有人正在寻找一种简单的方法来应用类似 transformer 的功能
如果您也希望使转换器可写,那么您可以在您选择的 public 包中的 sharedParams 库中重新实现 HasInputCol 等特性,然后将它们与 DefaultParamsWritable 特性一起使用使转换器持久化。
这样您还可以避免将部分代码放在 spark core ml 包中,但您可以在自己的包中维护一组平行的参数。这不是真正的问题,因为它们几乎从不改变。
但是请跟踪他们的 JIRA 板 here 中的错误,该错误要求制作一些通用的共享参数 public 而不是 ml 私有的,以便人们可以直接从外部使用它们类.