Spark SQL UDF 返回带有 df.WithColumn() 的 Scala 不可变映射
Spark SQL UDF returning scala immutable Map with df.WithColumn()
我有案例class
case class MyCaseClass(City : String, Extras : Map[String, String])
和用户定义函数 returns scala.collection.immutable.Map
def extrasUdf = spark.udf.register(
"extras_udf",
(age : Int, name : String) => Map("age" -> age.toString, "name" -> name)
)
但这会因异常而中断:
import spark.implicits._
spark.read.options(...).load(...)
.select('City, 'Age, 'Name)
.withColumn("Extras", extrasUdf('Age, 'Name))
.drop('Age)
.drop('Name)
.as[MyCaseClass]
我应该使用 spark sql 的 MapType(DataTypes.StringType, DataTypes.IntegerType)
但我找不到任何工作示例...
如果我使用 scala.collection.Map 这会起作用,但我需要不可变的 Map
你的代码有很多问题:
您正在使用 def extrastUdf =
,它创建了一个用于注册 UDF 的函数,而不是实际上 creating/registering UDF。请改用 val extrasUdf =
。
您在地图中混合了值类型(String
和 Int
),这使得地图成为 Map[String, Any]
,因为 Any
是String
和 Int
的公共超类。 Spark 不支持 Any
。您至少可以做两件事:(a) 切换到使用字符串映射(使用 age.toString
,在这种情况下您不需要 UDF,因为您可以简单地使用 map()
) or (b) switch to using named structs using named_struct()
(同样,没有需要一个 UDF)。通常,如果你不能用现有的函数做你需要做的事情,只写一个 UDF。我更喜欢看 Hive 文档,因为 Spark 文档相当稀疏。
此外,请记住 Spark 模式中的类型规范(例如,MapType
)与 Scala 类型(例如,Map[_, _]
)完全不同,并且与类型的方式分开在内部表示并在 Scala 和 Spark 数据结构之间映射。换句话说,这与可变与不可变集合无关。
希望对您有所帮助!
我有案例class
case class MyCaseClass(City : String, Extras : Map[String, String])
和用户定义函数 returns scala.collection.immutable.Map
def extrasUdf = spark.udf.register(
"extras_udf",
(age : Int, name : String) => Map("age" -> age.toString, "name" -> name)
)
但这会因异常而中断:
import spark.implicits._
spark.read.options(...).load(...)
.select('City, 'Age, 'Name)
.withColumn("Extras", extrasUdf('Age, 'Name))
.drop('Age)
.drop('Name)
.as[MyCaseClass]
我应该使用 spark sql 的 MapType(DataTypes.StringType, DataTypes.IntegerType) 但我找不到任何工作示例...
如果我使用 scala.collection.Map 这会起作用,但我需要不可变的 Map
你的代码有很多问题:
您正在使用
def extrastUdf =
,它创建了一个用于注册 UDF 的函数,而不是实际上 creating/registering UDF。请改用val extrasUdf =
。您在地图中混合了值类型(
String
和Int
),这使得地图成为Map[String, Any]
,因为Any
是String
和Int
的公共超类。 Spark 不支持Any
。您至少可以做两件事:(a) 切换到使用字符串映射(使用age.toString
,在这种情况下您不需要 UDF,因为您可以简单地使用map()
) or (b) switch to using named structs usingnamed_struct()
(同样,没有需要一个 UDF)。通常,如果你不能用现有的函数做你需要做的事情,只写一个 UDF。我更喜欢看 Hive 文档,因为 Spark 文档相当稀疏。此外,请记住 Spark 模式中的类型规范(例如,
MapType
)与 Scala 类型(例如,Map[_, _]
)完全不同,并且与类型的方式分开在内部表示并在 Scala 和 Spark 数据结构之间映射。换句话说,这与可变与不可变集合无关。
希望对您有所帮助!