具有非列参数的 Spark udf
Spark udf with non column parameters
我想将变量而不是列传递给 spark 中的 UDF。
地图格式如下
val joinUDF = udf((replacementLookup: Map[String, Double], newValue: String) => {
replacementLookup.get(newValue) match {
case Some(tt) => tt
case None => 0.0
}
})
应该这样映射
(columnsMap).foldLeft(df) {
(currentDF, colName) =>
{
println(colName._1)
println(colName._2)
currentDF
.withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))
}
}
但是抛出
type mismatch;
[error] found : Map
[error] required: org.apache.spark.sql.Column
[error] .withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))
如果要将文字传递给 UDF,请使用 org.apache.spark.sql.functions.lit
即使用 joinUDF(lit(colName._2), col(colName._1))
但是不支持地图,所以你必须重写你的代码,例如通过在创建 udf
之前应用 Map 参数
val joinFunction = (replacementLookup: Map[String, Double], newValue: String) => {
replacementLookup.get(newValue) match {
case Some(tt) => tt
case None => 0.0
}
}
(columnsMap).foldLeft(df) {
(currentDF, colName) =>
{
val joinUDF = udf(joinFunction(colName._2, _:String))
currentDF
.withColumn("myColumn_" + colName._1, joinUDF(col(colName._1)))
}
}
您可以使用柯里化:
import org.apache.spark.sql.functions._
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn")
def joinUDF(replacementLookup: Map[String, Double]) = udf((newValue: String) => {
replacementLookup.get(newValue) match {
case Some(tt) => tt
case None => 0.0
}
})
val myMap = Map("a" -> 1.5, "b" -> 3.0)
df.select(joinUDF(myMap)($"StringColumn")).show()
此外,您可以尝试使用广播变量:
import org.apache.spark.sql.functions._
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn")
val myMap = Map("a" -> 1.5, "b" -> 3.0)
val broadcastedMap = sc.broadcast(myMap)
def joinUDF = udf((newValue: String) => {
broadcastedMap.value.get(newValue) match {
case Some(tt) => tt
case None => 0.0
}
})
df.select(joinUDF($"StringColumn")).show()
我想将变量而不是列传递给 spark 中的 UDF。
地图格式如下
val joinUDF = udf((replacementLookup: Map[String, Double], newValue: String) => {
replacementLookup.get(newValue) match {
case Some(tt) => tt
case None => 0.0
}
})
应该这样映射
(columnsMap).foldLeft(df) {
(currentDF, colName) =>
{
println(colName._1)
println(colName._2)
currentDF
.withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))
}
}
但是抛出
type mismatch;
[error] found : Map
[error] required: org.apache.spark.sql.Column
[error] .withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))
如果要将文字传递给 UDF,请使用 org.apache.spark.sql.functions.lit
即使用 joinUDF(lit(colName._2), col(colName._1))
但是不支持地图,所以你必须重写你的代码,例如通过在创建 udf
之前应用 Map 参数val joinFunction = (replacementLookup: Map[String, Double], newValue: String) => {
replacementLookup.get(newValue) match {
case Some(tt) => tt
case None => 0.0
}
}
(columnsMap).foldLeft(df) {
(currentDF, colName) =>
{
val joinUDF = udf(joinFunction(colName._2, _:String))
currentDF
.withColumn("myColumn_" + colName._1, joinUDF(col(colName._1)))
}
}
您可以使用柯里化:
import org.apache.spark.sql.functions._
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn")
def joinUDF(replacementLookup: Map[String, Double]) = udf((newValue: String) => {
replacementLookup.get(newValue) match {
case Some(tt) => tt
case None => 0.0
}
})
val myMap = Map("a" -> 1.5, "b" -> 3.0)
df.select(joinUDF(myMap)($"StringColumn")).show()
此外,您可以尝试使用广播变量:
import org.apache.spark.sql.functions._
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn")
val myMap = Map("a" -> 1.5, "b" -> 3.0)
val broadcastedMap = sc.broadcast(myMap)
def joinUDF = udf((newValue: String) => {
broadcastedMap.value.get(newValue) match {
case Some(tt) => tt
case None => 0.0
}
})
df.select(joinUDF($"StringColumn")).show()