有没有办法创建一个接受两个字符串数组并将这些字符串作为两个参数传递给函数的 UDF?

Is there a way to create a UDF that accepts an array of two strings and pass those strings as two arguments to a function?

我是 Scala 的新手,请原谅我糟糕的文笔。 我有一个函数 func1 接受两个字符串和 returns 一个字符串。 我还有一个数据框 df1,它有 2 列 a1 和 b1。我正在尝试使用 df1 的列(a1 和 b1)和作为函数 func1 输出的新列 c1 创建一个新的数据框 df2。我知道我需要使用 UDF。我不知道如何创建一个可以接受 2 列的 UDF,并将这两个作为参数传递给 func1 和 return 输出字符串(c1 列)。

这是我尝试过的一些方法 -

def func1(str1:String, str2:String) : String = {   
        //code
        return str3;
}

val df1= spark.sql("select * from emp")
  .select("a1", "b1").cache()


val df2 = spark.sql("select * from df1")
  .withColumn("c1", func1("a1","b1"))
  .select("a1", "b1").cache()

但我没有得到结果。请指教。提前致谢。

你基本上是语法问题。

请记住,当您这样做时 def func1(str1:String, str2:String) : String = ... func1 指的是 Scala 函数对象,而不是 Spark 表达式。

另一方面,.withColumn 需要一个 Spark 表达式作为它的第二个参数。

那么发生的事情是您对 .withColumn("c1", func1("a1","b1")) 的调用向 Spark 发送了一个 Scala function 对象,而 Spark API 期望一个 "Spark Expression" (例如,一列,或者对列的操作,例如用户定义函数 (UDF))。

幸运的是,将 Scala 函数转换为 Spark UDF 很容易,一般来说,通过调用 spark 的 udf 方法对其进行包装。

所以一个有效的例子可以这样出来:

// A sample dataframe 
val dataframe = Seq(("a", "b"), ("c", "d")).toDF("columnA", "columnB")
// An example scala function that actually does something (string concat)
def concat(first: String, second: String) = first+second
// A conversion from scala function to spark UDF :
val concatUDF = udf((first: String, second: String) => concat(first, second))
// An sample execution of the UDF
// note the $ sign, which is short for indicating a column name
dataframe.withColumn("concat", concatUDF($"columnA", $"columnB")).show
+-------+-------+------+
|columnA|columnB|concat|
+-------+-------+------+
|      a|      b|    ab|
|      c|      d|    cd|
+-------+-------+------+

从那时起,应该很容易适应您的精确函数及其参数。

这是你会怎么做

scala> val df = Seq(("John","26"),("Bob","31")).toDF("a1","b1")
df: org.apache.spark.sql.DataFrame = [a1: string, b1: string]

scala> df.createOrReplaceTempView("emp")

scala> :paste
// Entering paste mode (ctrl-D to finish)

def func1(str1:String, str2:String) : String = {
        val str3 = s" ${str1} is ${str2} years old"
        return str3;
}

// Exiting paste mode, now interpreting.

func1: (str1: String, str2: String)String

scala> val my_udf_func1 = udf( func1(_:String,_:String):String )
my_udf_func1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,StringType,Some(List(StringType, StringType)))

scala> spark.sql("select * from emp").withColumn("c1", my_udf_func1($"a1",$"b1")).show(false)
2019-01-14 21:08:30 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
+----+---+---------------------+
|a1  |b1 |c1                   |
+----+---+---------------------+
|John|26 | John is 26 years old|
|Bob |31 | Bob is 31 years old |
+----+---+---------------------+


scala>

有两处需要更正..

定义正则函数后,需要在udf()中注册为

val my_udf_func1 = udf( func1(_:String,_:String):String )

在调用 udf 时,您应该使用 $"a1" 语法,而不仅仅是 "a1"