多列和集合元素的 spark UDF 模式匹配

spark UDF pattern matching for multiple columns and collection elements

给定 df 如下:

val df = spark.createDataFrame(Seq(
(1, 2, 3),
(3, 2, 1)
)).toDF("One", "Two", "Three")

具有架构:

我想写一个 udfThree columns 作为输入;和 returns 基于最高输入值的新列,如下所示:

import org.apache.spark.sql.functions.udf


def udfScoreToCategory=udf((One: Int, Two: Int, Three: Int): Int => {
    cols match {
    case cols if One > Two && One > Three => 1
    case cols if Two > One && Two > Three => 2
    case _ => 0
}}

看看如何用 vector type 作为输入做类似的事情会很有趣:

import org.apache.spark.ml.linalg.Vector

def udfVectorToCategory=udf((cols:org.apache.spark.ml.linalg.Vector): Int => {
    cols match {
    case cols if cols(0) > cols(1) && cols(0) > cols(2) => 1,
    case cols if cols(1) > cols(0) && cols(1) > cols(2) => 2
    case _ => 0
}})

我能够通过以下方式找到向量的最大元素:

  val vectorToCluster = udf{ (x: Vector) => x.argmax }

但是,我仍然很困惑如何对多列值进行模式匹配。

一些问题:

  • cols 第一个例子不在范围内。
  • (...): T => ... 不是匿名函数的有效语法。
  • 这里最好用val而不是def

一种定义方式:

val udfScoreToCategory = udf[Int, (Int, Int, Int)]{
  case (one, two, three) if one > two && one > three => 1
  case (one, two, three) if two > one && two > three => 2
  case _ => 0
}

val udfVectorToCategory = udf[Int, org.apache.spark.ml.linalg.Vector]{
  _.toArray match {
    case Array(one, two, three) if one > two && one > three => 1
    case Array(one, two, three) if two > one && two > three => 2
    case _ => 0
}}

一般来说,对于第一种情况你应该使用``when`

import org.apache.spark.sql.functions.when

when ($"one" > $"two" && $"one" > $"three", 1)
  .when ($"two" > $"one" && $"two" > $"three", 2)
  .otherwise(0)

其中 onetwothree 是列名。