Spark 函数别名 - 高性能 udfs
Spark function aliases - performant udfs
上下文
在我编写的许多 sql 查询中,我发现自己以完全相同的方式组合 spark 预定义函数,这通常会导致 冗长和重复的代码 ,我的开发人员本能是想要重构它。
所以,我的问题是:有没有什么方法可以为函数组合定义某种 别名,而不求助于 udfs(出于性能原因应避免)-目标是使代码更清晰、更简洁。本质上,我想要的是 udfs
之类的东西,但没有性能损失。此外,这些函数 必须可以从可在 spark.sql
调用中使用的 spark-sql 查询中调用。
例子
例如,假设我的业务逻辑是反转一些字符串并像这样散列它:(请注意,这里的函数组合是无关紧要的,重要的是它是现有预定义spark的某种组合功能 - 可能有很多)
SELECT
sha1(reverse(person.name)),
sha1(reverse(person.some_information)),
sha1(reverse(person.some_other_information))
...
FROM person
有没有一种方法可以声明 business
函数而无需支付使用 udf
的性能代价,从而允许将上面的代码重写为:
SELECT
business(person.name),
business(person.some_information),
business(person.some_other_information)
...
FROM person
我在 spark 文档和这个网站上搜索了很多,但没有找到实现这个的方法,这对我来说很奇怪,因为它看起来很自然的需要,但我不理解为什么您必须为定义和调用 udf 付出 black-box 的代价。
Is there a way of declaring a business function without paying the performance price of using a udf
您不必使用 udf
,您可以扩展 Expression
class,或者对于最简单的操作 - UnaryExpression
。然后你将只需要实现几个方法,我们就开始吧。它原生集成到 Spark 中,此外还可以使用一些优势功能,例如代码生成。
在您的情况下,添加 business
函数非常简单:
def business(column: Column): Column = {
sha1(reverse(column))
}
MUST be callable from within a spark-sql query usable in spark.sql calls
这更棘手但可以实现。
您需要创建自定义函数注册商:
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.Expression
object FunctionAliasRegistrar {
val funcs: mutable.Map[String, Seq[Column] => Column] = mutable.Map.empty
def add(name: String, builder: Seq[Column] => Column): this.type = {
funcs += name -> builder
this
}
def registerAll(spark: SparkSession) = {
funcs.foreach { case (alias, builder) => {
def b(children: Seq[Expression]) = builder.apply(children.map(expr => new Column(expr))).expr
spark.sessionState.functionRegistry.registerFunction(FunctionIdentifier(alias), b)
}}
}
}
那么就可以这样使用了:
FunctionAliasRegistrar
.add("business1", child => lower(reverse(child.head)))
.add("business2", child => upper(reverse(child.head)))
.registerAll(spark)
dataset.createTempView("data")
spark.sql(
"""
| SELECT business1(name), business2(name) FROM data
|""".stripMargin)
.show(false)
输出:
+--------------------+--------------------+
|lower(reverse(name))|upper(reverse(name))|
+--------------------+--------------------+
|sined |SINED |
|taram |TARAM |
|1taram |1TARAM |
|2taram |2TARAM |
+--------------------+--------------------+
希望对您有所帮助。
上下文
在我编写的许多 sql 查询中,我发现自己以完全相同的方式组合 spark 预定义函数,这通常会导致 冗长和重复的代码 ,我的开发人员本能是想要重构它。
所以,我的问题是:有没有什么方法可以为函数组合定义某种 别名,而不求助于 udfs(出于性能原因应避免)-目标是使代码更清晰、更简洁。本质上,我想要的是 udfs
之类的东西,但没有性能损失。此外,这些函数 必须可以从可在 spark.sql
调用中使用的 spark-sql 查询中调用。
例子
例如,假设我的业务逻辑是反转一些字符串并像这样散列它:(请注意,这里的函数组合是无关紧要的,重要的是它是现有预定义spark的某种组合功能 - 可能有很多)
SELECT
sha1(reverse(person.name)),
sha1(reverse(person.some_information)),
sha1(reverse(person.some_other_information))
...
FROM person
有没有一种方法可以声明 business
函数而无需支付使用 udf
的性能代价,从而允许将上面的代码重写为:
SELECT
business(person.name),
business(person.some_information),
business(person.some_other_information)
...
FROM person
我在 spark 文档和这个网站上搜索了很多,但没有找到实现这个的方法,这对我来说很奇怪,因为它看起来很自然的需要,但我不理解为什么您必须为定义和调用 udf 付出 black-box 的代价。
Is there a way of declaring a business function without paying the performance price of using a udf
您不必使用 udf
,您可以扩展 Expression
class,或者对于最简单的操作 - UnaryExpression
。然后你将只需要实现几个方法,我们就开始吧。它原生集成到 Spark 中,此外还可以使用一些优势功能,例如代码生成。
在您的情况下,添加 business
函数非常简单:
def business(column: Column): Column = {
sha1(reverse(column))
}
MUST be callable from within a spark-sql query usable in spark.sql calls
这更棘手但可以实现。
您需要创建自定义函数注册商:
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.Expression
object FunctionAliasRegistrar {
val funcs: mutable.Map[String, Seq[Column] => Column] = mutable.Map.empty
def add(name: String, builder: Seq[Column] => Column): this.type = {
funcs += name -> builder
this
}
def registerAll(spark: SparkSession) = {
funcs.foreach { case (alias, builder) => {
def b(children: Seq[Expression]) = builder.apply(children.map(expr => new Column(expr))).expr
spark.sessionState.functionRegistry.registerFunction(FunctionIdentifier(alias), b)
}}
}
}
那么就可以这样使用了:
FunctionAliasRegistrar
.add("business1", child => lower(reverse(child.head)))
.add("business2", child => upper(reverse(child.head)))
.registerAll(spark)
dataset.createTempView("data")
spark.sql(
"""
| SELECT business1(name), business2(name) FROM data
|""".stripMargin)
.show(false)
输出:
+--------------------+--------------------+
|lower(reverse(name))|upper(reverse(name))|
+--------------------+--------------------+
|sined |SINED |
|taram |TARAM |
|1taram |1TARAM |
|2taram |2TARAM |
+--------------------+--------------------+
希望对您有所帮助。