Spark udf初始化
Spark udf initialization
我想在 Spark SQL 中创建自定义的基于正则表达式的 UDF。我的偏好是创建一个内存驻留
Map[String,Pattern]
其中 Pattern 指的是字符串键的已编译正则表达式版本。但要做到这一点,我们需要将地图创建放入 UDF 的 "initialize" 函数中。
Spark udf 是否有任何结构支持跨调用的持久状态(通过 Spark SQL)?
请注意,HIVE 确实支持 UDF 的生命周期。我用它来生成解析树作为初始化的一部分,以便 UDF 的实际调用是针对不涉及解析的闪电般快速的树。
让我们从导入和一些虚拟数据开始:
import org.apache.spark.sql.functions.udf
import scala.util.matching.Regex
import java.util.regex.Pattern
val df = sc.parallelize(Seq(
("foo", "this is bar"), ("foo", "this is foo"),
("bar", "foobar"), ("bar", "foo and foo")
)).toDF("type", "value")
和地图:
val patterns: Map[String, Pattern] = Seq(("foo", ".*foo.*"), ("bar", ".*bar.*"))
.map{case (k, v) => (k, new Regex(v).pattern)}
.toMap
现在我看到两个不同的选项:
使patterns
成为在udf
中引用的广播变量
val patternsBd = sc.broadcast(patterns)
val typeMatchedViaBroadcast = udf((t: String, v: String) =>
patternsBd.value.get(t).map(m => m.matcher(v).matches))
df.withColumn("match", typeMatchedViaBroadcast($"type", $"value")).show
// +----+-----------+-----+
// |type| value|match|
// +----+-----------+-----+
// | foo|this is bar|false|
// | foo|this is foo| true|
// | bar| foobar| true|
// | bar|foo and foo|false|
// +----+-----------+-----+
在闭包内传递映射
def makeTypeMatchedViaClosure(patterns: Map[String, Pattern]) = udf(
(t: String, v: String) => patterns.get(t).map(m => m.matcher(v).matches))
val typeMatchedViaClosure = makeTypeMatchedViaClosure(patterns)
df.withColumn("match", typeMatchedViaClosure($"type", $"value")).show
// +----+-----------+-----+
// |type| value|match|
// +----+-----------+-----+
// | foo|this is bar|false|
// | foo|this is foo| true|
// | bar| foobar| true|
// | bar|foo and foo|false|
// +----+-----------+-----+
我想在 Spark SQL 中创建自定义的基于正则表达式的 UDF。我的偏好是创建一个内存驻留
Map[String,Pattern]
其中 Pattern 指的是字符串键的已编译正则表达式版本。但要做到这一点,我们需要将地图创建放入 UDF 的 "initialize" 函数中。
Spark udf 是否有任何结构支持跨调用的持久状态(通过 Spark SQL)?
请注意,HIVE 确实支持 UDF 的生命周期。我用它来生成解析树作为初始化的一部分,以便 UDF 的实际调用是针对不涉及解析的闪电般快速的树。
让我们从导入和一些虚拟数据开始:
import org.apache.spark.sql.functions.udf
import scala.util.matching.Regex
import java.util.regex.Pattern
val df = sc.parallelize(Seq(
("foo", "this is bar"), ("foo", "this is foo"),
("bar", "foobar"), ("bar", "foo and foo")
)).toDF("type", "value")
和地图:
val patterns: Map[String, Pattern] = Seq(("foo", ".*foo.*"), ("bar", ".*bar.*"))
.map{case (k, v) => (k, new Regex(v).pattern)}
.toMap
现在我看到两个不同的选项:
使
中引用的广播变量patterns
成为在udf
val patternsBd = sc.broadcast(patterns) val typeMatchedViaBroadcast = udf((t: String, v: String) => patternsBd.value.get(t).map(m => m.matcher(v).matches)) df.withColumn("match", typeMatchedViaBroadcast($"type", $"value")).show // +----+-----------+-----+ // |type| value|match| // +----+-----------+-----+ // | foo|this is bar|false| // | foo|this is foo| true| // | bar| foobar| true| // | bar|foo and foo|false| // +----+-----------+-----+
在闭包内传递映射
def makeTypeMatchedViaClosure(patterns: Map[String, Pattern]) = udf( (t: String, v: String) => patterns.get(t).map(m => m.matcher(v).matches)) val typeMatchedViaClosure = makeTypeMatchedViaClosure(patterns) df.withColumn("match", typeMatchedViaClosure($"type", $"value")).show // +----+-----------+-----+ // |type| value|match| // +----+-----------+-----+ // | foo|this is bar|false| // | foo|this is foo| true| // | bar| foobar| true| // | bar|foo and foo|false| // +----+-----------+-----+