多列和集合元素的 spark UDF 模式匹配
spark UDF pattern matching for multiple columns and collection elements
给定 df
如下:
val df = spark.createDataFrame(Seq(
(1, 2, 3),
(3, 2, 1)
)).toDF("One", "Two", "Three")
具有架构:
我想写一个 udf
以 Three columns
作为输入;和 returns 基于最高输入值的新列,如下所示:
import org.apache.spark.sql.functions.udf
def udfScoreToCategory=udf((One: Int, Two: Int, Three: Int): Int => {
cols match {
case cols if One > Two && One > Three => 1
case cols if Two > One && Two > Three => 2
case _ => 0
}}
看看如何用 vector type
作为输入做类似的事情会很有趣:
import org.apache.spark.ml.linalg.Vector
def udfVectorToCategory=udf((cols:org.apache.spark.ml.linalg.Vector): Int => {
cols match {
case cols if cols(0) > cols(1) && cols(0) > cols(2) => 1,
case cols if cols(1) > cols(0) && cols(1) > cols(2) => 2
case _ => 0
}})
我能够通过以下方式找到向量的最大元素:
val vectorToCluster = udf{ (x: Vector) => x.argmax }
但是,我仍然很困惑如何对多列值进行模式匹配。
一些问题:
cols
第一个例子不在范围内。
(...): T => ...
不是匿名函数的有效语法。
- 这里最好用
val
而不是def
。
一种定义方式:
val udfScoreToCategory = udf[Int, (Int, Int, Int)]{
case (one, two, three) if one > two && one > three => 1
case (one, two, three) if two > one && two > three => 2
case _ => 0
}
和
val udfVectorToCategory = udf[Int, org.apache.spark.ml.linalg.Vector]{
_.toArray match {
case Array(one, two, three) if one > two && one > three => 1
case Array(one, two, three) if two > one && two > three => 2
case _ => 0
}}
一般来说,对于第一种情况你应该使用``when`
import org.apache.spark.sql.functions.when
when ($"one" > $"two" && $"one" > $"three", 1)
.when ($"two" > $"one" && $"two" > $"three", 2)
.otherwise(0)
其中 one
、two
、three
是列名。
给定 df
如下:
val df = spark.createDataFrame(Seq(
(1, 2, 3),
(3, 2, 1)
)).toDF("One", "Two", "Three")
具有架构:
我想写一个 udf
以 Three columns
作为输入;和 returns 基于最高输入值的新列,如下所示:
import org.apache.spark.sql.functions.udf
def udfScoreToCategory=udf((One: Int, Two: Int, Three: Int): Int => {
cols match {
case cols if One > Two && One > Three => 1
case cols if Two > One && Two > Three => 2
case _ => 0
}}
看看如何用 vector type
作为输入做类似的事情会很有趣:
import org.apache.spark.ml.linalg.Vector
def udfVectorToCategory=udf((cols:org.apache.spark.ml.linalg.Vector): Int => {
cols match {
case cols if cols(0) > cols(1) && cols(0) > cols(2) => 1,
case cols if cols(1) > cols(0) && cols(1) > cols(2) => 2
case _ => 0
}})
我能够通过以下方式找到向量的最大元素:
val vectorToCluster = udf{ (x: Vector) => x.argmax }
但是,我仍然很困惑如何对多列值进行模式匹配。
一些问题:
cols
第一个例子不在范围内。(...): T => ...
不是匿名函数的有效语法。- 这里最好用
val
而不是def
。
一种定义方式:
val udfScoreToCategory = udf[Int, (Int, Int, Int)]{
case (one, two, three) if one > two && one > three => 1
case (one, two, three) if two > one && two > three => 2
case _ => 0
}
和
val udfVectorToCategory = udf[Int, org.apache.spark.ml.linalg.Vector]{
_.toArray match {
case Array(one, two, three) if one > two && one > three => 1
case Array(one, two, three) if two > one && two > three => 2
case _ => 0
}}
一般来说,对于第一种情况你应该使用``when`
import org.apache.spark.sql.functions.when
when ($"one" > $"two" && $"one" > $"three", 1)
.when ($"two" > $"one" && $"two" > $"three", 2)
.otherwise(0)
其中 one
、two
、three
是列名。