Spark Scala UDF:java.lang.UnsupportedOperationException:不支持 Any 类型的模式
Spark Scala UDF : java.lang.UnsupportedOperationException: Schema for type Any is not supported
我正在尝试 return 使用 if else 从 UDF 映射并得到以下异常,请指点?
java.lang.UnsupportedOperationException: Schema for type Any is not
supported
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.functions._
val df2 = Seq(
("1", Map("Fld1" -> "USA","Fld2" -> "UK")),
("2", Map("Fld1" -> "Germany", "Fld2" -> "Portugal")),
("3", Map("Fld1" -> "France", "Fld2" -> "Paris"))
).toDF("id", "map")
val getmapUdf = udf((map1: Map[String, String]) => {
val fl1 = map1.getOrElse("Fld1","unknown")
val fl2 = map1.getOrElse("Fld2","unknown")
if (fl1 =="Germany")
{
Map("key1" -> "G")
}
else if(fl1 =="France")
{
if (fl2 =="UK")
{
Map("key1" ->"U")
}
else
{
Map("key1" ->"Y")
}
}
else if(fl1 =="France")
{
Map("key1" ->"G")
}
})
var temp2 = df2.withColumn("mymap", getmapUdf($"map"))
temp2.show(false)
您收到该错误是因为您的 UDF 函数并不总是 return 类型 Map[String,String]
,您使用的 if/else 语句在不满足条件时不包含默认值, 所以 return 类型是 Any.
然而,你可以做同样的事情 w/o UDF 实际上,使用 when
函数:
var temp2 = df2.withColumn(
"mymap",
when($"map" ("Fld1") === "Germany", map(lit("key1"), lit("G")))
when ($"map" ("Fld1") === "France" && $"map" ("Fld2") === "UK", map(lit("key1"),lit("G")))
when ($"map" ("Fld1") === "France", map(lit("key1"), lit("Y")))
)
temp2.show(false)
//+---+-----------------------------------+-----------+
//|id |map |mymap |
//+---+-----------------------------------+-----------+
//|1 |[Fld1 -> USA, Fld2 -> UK] |null |
//|2 |[Fld1 -> Germany, Fld2 -> Portugal]|[key1 -> G]|
//|3 |[Fld1 -> France, Fld2 -> Paris] |[key1 -> Y]|
//+---+-----------------------------------+-----------+
无论如何,如果你想使用UDF,将函数修改为return Option[Map[String,String]]
。像这样:
val getmapUdf = udf((map1: Map[String, String]) => {
val fl1 = map1.getOrElse("Fld1", "unknown")
val fl2 = map1.getOrElse("Fld2", "unknown")
if (fl1 == "Germany") {
Some(Map("key1" -> "G"))
} else if (fl1 == "France") {
if (fl2 == "UK") {
Some(Map("key1" -> "U"))
} else {
Some(Map("key1" -> "Y"))
}
} else if (fl1 == "France") {
Some(Map("key1" -> "G"))
} else {
None
}
})
如果您想继续使用您正在做的事情,我将尝试在此处提供替代答案,但@blackbishop 的答案涵盖了实现相同内容的大部分选项。
为了让您的代码正常工作,您只需更改您的 UDF,以确保您有其他条件 returns 默认地图,如下所示,这样您就不会收到该错误。
val getmapUdf = udf((map1: Map[String, String]) => {
val fl1 = map1.getOrElse("Fld1","unknown")
val fl2 = map1.getOrElse("Fld2","unknown")
if (fl1 =="Germany")
{
Map("key1" -> "G")
}
else if(fl1 =="France")
{
if (fl2 =="UK")
{
Map("key1" ->"U")
}
else
{
Map("key1" ->"Y")
}
}
else if(fl1 =="France")
{
Map("key1" ->"G")
}
else
{
Map("key1" -> "unknown")
}
})
我正在尝试 return 使用 if else 从 UDF 映射并得到以下异常,请指点?
java.lang.UnsupportedOperationException: Schema for type Any is not supported
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.functions._
val df2 = Seq(
("1", Map("Fld1" -> "USA","Fld2" -> "UK")),
("2", Map("Fld1" -> "Germany", "Fld2" -> "Portugal")),
("3", Map("Fld1" -> "France", "Fld2" -> "Paris"))
).toDF("id", "map")
val getmapUdf = udf((map1: Map[String, String]) => {
val fl1 = map1.getOrElse("Fld1","unknown")
val fl2 = map1.getOrElse("Fld2","unknown")
if (fl1 =="Germany")
{
Map("key1" -> "G")
}
else if(fl1 =="France")
{
if (fl2 =="UK")
{
Map("key1" ->"U")
}
else
{
Map("key1" ->"Y")
}
}
else if(fl1 =="France")
{
Map("key1" ->"G")
}
})
var temp2 = df2.withColumn("mymap", getmapUdf($"map"))
temp2.show(false)
您收到该错误是因为您的 UDF 函数并不总是 return 类型 Map[String,String]
,您使用的 if/else 语句在不满足条件时不包含默认值, 所以 return 类型是 Any.
然而,你可以做同样的事情 w/o UDF 实际上,使用 when
函数:
var temp2 = df2.withColumn(
"mymap",
when($"map" ("Fld1") === "Germany", map(lit("key1"), lit("G")))
when ($"map" ("Fld1") === "France" && $"map" ("Fld2") === "UK", map(lit("key1"),lit("G")))
when ($"map" ("Fld1") === "France", map(lit("key1"), lit("Y")))
)
temp2.show(false)
//+---+-----------------------------------+-----------+
//|id |map |mymap |
//+---+-----------------------------------+-----------+
//|1 |[Fld1 -> USA, Fld2 -> UK] |null |
//|2 |[Fld1 -> Germany, Fld2 -> Portugal]|[key1 -> G]|
//|3 |[Fld1 -> France, Fld2 -> Paris] |[key1 -> Y]|
//+---+-----------------------------------+-----------+
无论如何,如果你想使用UDF,将函数修改为return Option[Map[String,String]]
。像这样:
val getmapUdf = udf((map1: Map[String, String]) => {
val fl1 = map1.getOrElse("Fld1", "unknown")
val fl2 = map1.getOrElse("Fld2", "unknown")
if (fl1 == "Germany") {
Some(Map("key1" -> "G"))
} else if (fl1 == "France") {
if (fl2 == "UK") {
Some(Map("key1" -> "U"))
} else {
Some(Map("key1" -> "Y"))
}
} else if (fl1 == "France") {
Some(Map("key1" -> "G"))
} else {
None
}
})
如果您想继续使用您正在做的事情,我将尝试在此处提供替代答案,但@blackbishop 的答案涵盖了实现相同内容的大部分选项。
为了让您的代码正常工作,您只需更改您的 UDF,以确保您有其他条件 returns 默认地图,如下所示,这样您就不会收到该错误。
val getmapUdf = udf((map1: Map[String, String]) => {
val fl1 = map1.getOrElse("Fld1","unknown")
val fl2 = map1.getOrElse("Fld2","unknown")
if (fl1 =="Germany")
{
Map("key1" -> "G")
}
else if(fl1 =="France")
{
if (fl2 =="UK")
{
Map("key1" ->"U")
}
else
{
Map("key1" ->"Y")
}
}
else if(fl1 =="France")
{
Map("key1" ->"G")
}
else
{
Map("key1" -> "unknown")
}
})