如何在spark中将地图传递给UDF

How to pass in a map into UDF in spark

这是我的问题,我有一张 Map[Array[String],String] 的地图,我想将其传递到 UDF 中。

这是我的 UDF:

def lookup(lookupMap:Map[Array[String],String]) = 
  udf((input:Array[String]) => lookupMap.lift(input))

这是我的地图变量:

val srdd = df.rdd.map { row => (
  Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString),  
  row.getString(7)
)}

我是这样调用函数的:

val combinedDF  = dftemp.withColumn("a",lookup(lookupMap))(Array($"b",$"c","d"))

我首先得到一个关于不可变数组的错误,所以我将我的数组更改为不可变类型,然后我得到一个关于类型不匹配的错误。我用谷歌搜索了一下,显然我不能将非列类型直接传递到 UDF 中。有人可以帮忙吗?荣誉。


更新:所以我确实将所有内容都转换为包装数组。这是我所做的:

val srdd = df.rdd.map{row => (WrappedArray.make[String](Array(row.getString(1),row.getString(5),row.getString(8))),row.getString(7))}

val lookupMap = srdd.collectAsMap()


def lookup(lookupMap:Map[collection.mutable.WrappedArray[String],String]) = udf((input:collection.mutable.WrappedArray[String]) => lookupMap.lift(input))


val combinedDF  = dftemp.withColumn("a",lookup(lookupMap))(Array($"b",$"c",$"d"))

现在我遇到这样的错误:

required: Map[scala.collection.mutable.WrappedArray[String],String] -ksh: Map[scala.collection.mutable.WrappedArray[String],String]: not found [No such file or directory]

我试过这样做:

val m = collection.immutable.Map(1->"one",2->"Two")
val n = collection.mutable.Map(m.toSeq: _*) 

但后来我又回到了列类型的错误。

首先,你必须传递一个Column作为UDF的参数;由于您希望此参数是一个数组,因此您应该使用 org.apache.spark.sql.functions 中的 array 函数,它从一系列其他 Columns 中创建一个数组 Column。所以 UDF 调用将是:

lookup(lookupMap)(array($"b",$"c",$"d"))

现在,由于数组列被反序列化为 mutable.WrappedArray,为了使地图查找成功,您最好确保这是您的 UDF 使用的类型:

def lookup(lookupMap: Map[mutable.WrappedArray[String],String]) =
  udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input))

所以一共:

import spark.implicits._
import org.apache.spark.sql.functions._

// Create an RDD[(mutable.WrappedArray[String], String)]:
val srdd = df.rdd.map { row: Row => (
  mutable.WrappedArray.make[String](Array(row.getString(1), row.getString(5), row.getString(8))), 
  row.getString(7)
)}

// collect it into a map (I assume this is what you're doing with srdd...)
val lookupMap: Map[mutable.WrappedArray[String], String] = srdd.collectAsMap()

def lookup(lookupMap: Map[mutable.WrappedArray[String],String]) =
  udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input))

val combinedDF  = dftemp.withColumn("a",lookup(lookupMap)(array($"b",$"c",$"d")))

Anna,您 srdd/lookupmap 的代码类型为 org.apache.spark.rdd.RDD[(Array[String], String)]

val srdd = df.rdd.map { row => (
Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString),  
  row.getString(7)
)}

在查找方法中,您希望将 Map 作为参数

def lookup(lookupMap:Map[Array[String],String]) = 
udf((input:Array[String]) => lookupMap.lift(input))

这就是您收到类型不匹配错误的原因。

首先将 srdd 从 RDD[tuple] 转换为 RDD[Map],然后尝试将 RDD 转换为 Map 以解决此错误。

val srdd = df.rdd.map { row => Map(
Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString) ->
  row.getString(7)
)}