将 WithColumn 与外部函数一起使用
Use WithColumn with external function
我在 DataFrame 中有以下几列的数据
- 文件格式为 csv
下面所有列的数据类型都是字符串
employeeid,pexpense,cexpense
现在我需要创建一个新的 DataFrame,其中包含名为 expense
的新列,该列是根据列 pexpense
、cexpense
计算的。
棘手的部分是计算算法不是我创建的 UDF 函数,而是需要从 Java 库导入的外部函数将基本类型作为参数 - 在本例中为 pexpense
、cexpense
- 计算新列所需的值。
来自外部Javajar的函数签名
public class MyJava
{
public Double calculateExpense(Double pexpense, Double cexpense) {
// calculation
}
}
那么如何调用该外部函数来创建新的计算列。我可以在我的 Spark 应用程序中将该外部函数注册为 UDF 吗?
下面是对两列求和的示例:
val somme= udf((a: Int, b: int) => a+b)
val df_new = df.select(col("employeeid"), \
col("pexpense"), \
col("pexpense"), \
somme(col("pexpense"), col("pexpense")) as "expense")
您可以简单地 "wrap" UDF 中的给定方法,方法是将其作为参数传递给 org.apache.spark.sql.functions
中的 udf
函数:
import org.apache.spark.sql.functions._
import spark.implicits._
val myUdf = udf(calculateExpense _)
val newDF = df.withColumn("expense", myUdf($"pexpense", $"cexpense"))
这假设 pexpense
和 cexpense
列都是 Double
。
您可以创建类似于以下内容的外部方法的 UDF(使用 Scala REPL 进行说明):
// From a Linux shell prompt:
vi MyJava.java
public class MyJava {
public Double calculateExpense(Double pexpense, Double cexpense) {
return pexpense + cexpense;
}
}
:wq
javac MyJava.java
jar -cvf MyJava.jar MyJava.class
spark-shell --jars /path/to/jar/MyJava.jar
// From within the Spark shell
val df = Seq(
("1", "1.0", "2.0"), ("2", "3.0", "4.0")
).toDF("employeeid", "pexpense", "cexpense")
val myJava = new MyJava
val myJavaUdf = udf(
myJava.calculateExpense _
)
val df2 = df.withColumn("totalexpense", myJavaUdf($"pexpense", $"cexpense") )
df2.show
+----------+--------+--------+------------+
|employeeid|pexpense|cexpense|totalexpense|
+----------+--------+--------+------------+
| 1| 1.0| 2.0| 3.0|
| 2| 3.0| 4.0| 7.0|
+----------+--------+--------+------------+
我在 DataFrame 中有以下几列的数据
- 文件格式为 csv
下面所有列的数据类型都是字符串
employeeid,pexpense,cexpense
现在我需要创建一个新的 DataFrame,其中包含名为 expense
的新列,该列是根据列 pexpense
、cexpense
计算的。
棘手的部分是计算算法不是我创建的 UDF 函数,而是需要从 Java 库导入的外部函数将基本类型作为参数 - 在本例中为 pexpense
、cexpense
- 计算新列所需的值。
来自外部Javajar的函数签名
public class MyJava
{
public Double calculateExpense(Double pexpense, Double cexpense) {
// calculation
}
}
那么如何调用该外部函数来创建新的计算列。我可以在我的 Spark 应用程序中将该外部函数注册为 UDF 吗?
下面是对两列求和的示例:
val somme= udf((a: Int, b: int) => a+b)
val df_new = df.select(col("employeeid"), \
col("pexpense"), \
col("pexpense"), \
somme(col("pexpense"), col("pexpense")) as "expense")
您可以简单地 "wrap" UDF 中的给定方法,方法是将其作为参数传递给 org.apache.spark.sql.functions
中的 udf
函数:
import org.apache.spark.sql.functions._
import spark.implicits._
val myUdf = udf(calculateExpense _)
val newDF = df.withColumn("expense", myUdf($"pexpense", $"cexpense"))
这假设 pexpense
和 cexpense
列都是 Double
。
您可以创建类似于以下内容的外部方法的 UDF(使用 Scala REPL 进行说明):
// From a Linux shell prompt:
vi MyJava.java
public class MyJava {
public Double calculateExpense(Double pexpense, Double cexpense) {
return pexpense + cexpense;
}
}
:wq
javac MyJava.java
jar -cvf MyJava.jar MyJava.class
spark-shell --jars /path/to/jar/MyJava.jar
// From within the Spark shell
val df = Seq(
("1", "1.0", "2.0"), ("2", "3.0", "4.0")
).toDF("employeeid", "pexpense", "cexpense")
val myJava = new MyJava
val myJavaUdf = udf(
myJava.calculateExpense _
)
val df2 = df.withColumn("totalexpense", myJavaUdf($"pexpense", $"cexpense") )
df2.show
+----------+--------+--------+------------+
|employeeid|pexpense|cexpense|totalexpense|
+----------+--------+--------+------------+
| 1| 1.0| 2.0| 3.0|
| 2| 3.0| 4.0| 7.0|
+----------+--------+--------+------------+