Spark SQL UDF 返回带有 df.WithColumn() 的 Scala 不可变映射

Spark SQL UDF returning scala immutable Map with df.WithColumn()

我有案例class

case class MyCaseClass(City : String, Extras : Map[String, String])

和用户定义函数 returns scala.collection.immutable.Map

def extrasUdf = spark.udf.register(
   "extras_udf", 
   (age : Int, name : String) => Map("age" -> age.toString, "name" -> name)
)

但这会因异常而中断:

import spark.implicits._

spark.read.options(...).load(...)
      .select('City, 'Age, 'Name)
      .withColumn("Extras", extrasUdf('Age, 'Name))
      .drop('Age)
      .drop('Name)
      .as[MyCaseClass]

我应该使用 spark sql 的 MapType(DataTypes.StringType, DataTypes.IntegerType) 但我找不到任何工作示例...

如果我使用 scala.collection.Map 这会起作用,但我需要不可变的 Map

你的代码有很多问题:

  • 您正在使用 def extrastUdf =,它创建了一个用于注册 UDF 的函数,而不是实际上 creating/registering UDF。请改用 val extrasUdf =

  • 您在地图中混合了值类型(StringInt),这使得地图成为 Map[String, Any],因为 AnyStringInt 的公共超类。 Spark 不支持 Any。您至少可以做两件事:(a) 切换到使用字符串映射(使用 age.toString,在这种情况下您不需要 UDF,因为您可以简单地使用 map()) or (b) switch to using named structs using named_struct()(同样,没有需要一个 UDF)。通常,如果你不能用现有的函数做你需要做的事情,只写一个 UDF。我更喜欢看 Hive 文档,因为 Spark 文档相当稀疏。

  • 此外,请记住 Spark 模式中的类型规范(例如,MapType)与 Scala 类型(例如,Map[_, _])完全不同,并且与类型的方式分开在内部表示并在 Scala 和 Spark 数据结构之间映射。换句话说,这与可变与不可变集合无关。

希望对您有所帮助!