将 WithColumn 与外部函数一起使用

Use WithColumn with external function

我在 DataFrame 中有以下几列的数据

  1. 文件格式为 csv
  2. 下面所有列的数据类型都是字符串

    employeeid,pexpense,cexpense

现在我需要创建一个新的 DataFrame,其中包含名为 expense 的新列,该列是根据列 pexpensecexpense 计算的。

棘手的部分是计算算法不是我创建的 UDF 函数,而是需要从 Java 库导入的外部函数将基本类型作为参数 - 在本例中为 pexpensecexpense - 计算新列所需的值。

来自外部Javajar的函数签名

public class MyJava

{

    public Double calculateExpense(Double pexpense, Double cexpense) {
       // calculation
    }

}

那么如何调用该外部函数来创建新的计算列。我可以在我的 Spark 应用程序中将该外部函数注册为 UDF 吗?

下面是对两列求和的示例:

val somme= udf((a: Int, b: int) => a+b)

val df_new = df.select(col("employeeid"), \
                       col("pexpense"),   \
                       col("pexpense"),   \
                       somme(col("pexpense"), col("pexpense")) as "expense")

您可以简单地 "wrap" UDF 中的给定方法,方法是将其作为参数传递给 org.apache.spark.sql.functions 中的 udf 函数:

import org.apache.spark.sql.functions._
import spark.implicits._

val myUdf = udf(calculateExpense _)
val newDF = df.withColumn("expense", myUdf($"pexpense", $"cexpense"))

这假设 pexpensecexpense 列都是 Double

您可以创建类似于以下内容的外部方法的 UDF(使用 Scala REPL 进行说明):

// From a Linux shell prompt:

vi MyJava.java
public class MyJava {
    public Double calculateExpense(Double pexpense, Double cexpense) {
        return pexpense + cexpense;
    }
}
:wq

javac MyJava.java
jar -cvf MyJava.jar MyJava.class

spark-shell --jars /path/to/jar/MyJava.jar

// From within the Spark shell

val df = Seq(
  ("1", "1.0", "2.0"), ("2", "3.0", "4.0")
).toDF("employeeid", "pexpense", "cexpense")

val myJava = new MyJava

val myJavaUdf = udf(
  myJava.calculateExpense _
)

val df2 = df.withColumn("totalexpense", myJavaUdf($"pexpense", $"cexpense") )

df2.show
+----------+--------+--------+------------+
|employeeid|pexpense|cexpense|totalexpense|
+----------+--------+--------+------------+
|         1|     1.0|     2.0|         3.0|
|         2|     3.0|     4.0|         7.0|
+----------+--------+--------+------------+