如何在spark中将地图传递给UDF
How to pass in a map into UDF in spark
这是我的问题,我有一张 Map[Array[String],String]
的地图,我想将其传递到 UDF 中。
这是我的 UDF:
def lookup(lookupMap:Map[Array[String],String]) =
udf((input:Array[String]) => lookupMap.lift(input))
这是我的地图变量:
val srdd = df.rdd.map { row => (
Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString),
row.getString(7)
)}
我是这样调用函数的:
val combinedDF = dftemp.withColumn("a",lookup(lookupMap))(Array($"b",$"c","d"))
我首先得到一个关于不可变数组的错误,所以我将我的数组更改为不可变类型,然后我得到一个关于类型不匹配的错误。我用谷歌搜索了一下,显然我不能将非列类型直接传递到 UDF 中。有人可以帮忙吗?荣誉。
更新:所以我确实将所有内容都转换为包装数组。这是我所做的:
val srdd = df.rdd.map{row => (WrappedArray.make[String](Array(row.getString(1),row.getString(5),row.getString(8))),row.getString(7))}
val lookupMap = srdd.collectAsMap()
def lookup(lookupMap:Map[collection.mutable.WrappedArray[String],String]) = udf((input:collection.mutable.WrappedArray[String]) => lookupMap.lift(input))
val combinedDF = dftemp.withColumn("a",lookup(lookupMap))(Array($"b",$"c",$"d"))
现在我遇到这样的错误:
required: Map[scala.collection.mutable.WrappedArray[String],String]
-ksh: Map[scala.collection.mutable.WrappedArray[String],String]: not found [No such file or directory]
我试过这样做:
val m = collection.immutable.Map(1->"one",2->"Two")
val n = collection.mutable.Map(m.toSeq: _*)
但后来我又回到了列类型的错误。
首先,你必须传递一个Column
作为UDF的参数;由于您希望此参数是一个数组,因此您应该使用 org.apache.spark.sql.functions
中的 array
函数,它从一系列其他 Columns 中创建一个数组 Column。所以 UDF 调用将是:
lookup(lookupMap)(array($"b",$"c",$"d"))
现在,由于数组列被反序列化为 mutable.WrappedArray
,为了使地图查找成功,您最好确保这是您的 UDF 使用的类型:
def lookup(lookupMap: Map[mutable.WrappedArray[String],String]) =
udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input))
所以一共:
import spark.implicits._
import org.apache.spark.sql.functions._
// Create an RDD[(mutable.WrappedArray[String], String)]:
val srdd = df.rdd.map { row: Row => (
mutable.WrappedArray.make[String](Array(row.getString(1), row.getString(5), row.getString(8))),
row.getString(7)
)}
// collect it into a map (I assume this is what you're doing with srdd...)
val lookupMap: Map[mutable.WrappedArray[String], String] = srdd.collectAsMap()
def lookup(lookupMap: Map[mutable.WrappedArray[String],String]) =
udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input))
val combinedDF = dftemp.withColumn("a",lookup(lookupMap)(array($"b",$"c",$"d")))
Anna,您 srdd/lookupmap 的代码类型为 org.apache.spark.rdd.RDD[(Array[String], String)]
val srdd = df.rdd.map { row => (
Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString),
row.getString(7)
)}
在查找方法中,您希望将 Map 作为参数
def lookup(lookupMap:Map[Array[String],String]) =
udf((input:Array[String]) => lookupMap.lift(input))
这就是您收到类型不匹配错误的原因。
首先将 srdd 从 RDD[tuple] 转换为 RDD[Map],然后尝试将 RDD 转换为 Map 以解决此错误。
val srdd = df.rdd.map { row => Map(
Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString) ->
row.getString(7)
)}
这是我的问题,我有一张 Map[Array[String],String]
的地图,我想将其传递到 UDF 中。
这是我的 UDF:
def lookup(lookupMap:Map[Array[String],String]) =
udf((input:Array[String]) => lookupMap.lift(input))
这是我的地图变量:
val srdd = df.rdd.map { row => (
Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString),
row.getString(7)
)}
我是这样调用函数的:
val combinedDF = dftemp.withColumn("a",lookup(lookupMap))(Array($"b",$"c","d"))
我首先得到一个关于不可变数组的错误,所以我将我的数组更改为不可变类型,然后我得到一个关于类型不匹配的错误。我用谷歌搜索了一下,显然我不能将非列类型直接传递到 UDF 中。有人可以帮忙吗?荣誉。
更新:所以我确实将所有内容都转换为包装数组。这是我所做的:
val srdd = df.rdd.map{row => (WrappedArray.make[String](Array(row.getString(1),row.getString(5),row.getString(8))),row.getString(7))}
val lookupMap = srdd.collectAsMap()
def lookup(lookupMap:Map[collection.mutable.WrappedArray[String],String]) = udf((input:collection.mutable.WrappedArray[String]) => lookupMap.lift(input))
val combinedDF = dftemp.withColumn("a",lookup(lookupMap))(Array($"b",$"c",$"d"))
现在我遇到这样的错误:
required: Map[scala.collection.mutable.WrappedArray[String],String] -ksh: Map[scala.collection.mutable.WrappedArray[String],String]: not found [No such file or directory]
我试过这样做:
val m = collection.immutable.Map(1->"one",2->"Two")
val n = collection.mutable.Map(m.toSeq: _*)
但后来我又回到了列类型的错误。
首先,你必须传递一个Column
作为UDF的参数;由于您希望此参数是一个数组,因此您应该使用 org.apache.spark.sql.functions
中的 array
函数,它从一系列其他 Columns 中创建一个数组 Column。所以 UDF 调用将是:
lookup(lookupMap)(array($"b",$"c",$"d"))
现在,由于数组列被反序列化为 mutable.WrappedArray
,为了使地图查找成功,您最好确保这是您的 UDF 使用的类型:
def lookup(lookupMap: Map[mutable.WrappedArray[String],String]) =
udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input))
所以一共:
import spark.implicits._
import org.apache.spark.sql.functions._
// Create an RDD[(mutable.WrappedArray[String], String)]:
val srdd = df.rdd.map { row: Row => (
mutable.WrappedArray.make[String](Array(row.getString(1), row.getString(5), row.getString(8))),
row.getString(7)
)}
// collect it into a map (I assume this is what you're doing with srdd...)
val lookupMap: Map[mutable.WrappedArray[String], String] = srdd.collectAsMap()
def lookup(lookupMap: Map[mutable.WrappedArray[String],String]) =
udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input))
val combinedDF = dftemp.withColumn("a",lookup(lookupMap)(array($"b",$"c",$"d")))
Anna,您 srdd/lookupmap 的代码类型为 org.apache.spark.rdd.RDD[(Array[String], String)]
val srdd = df.rdd.map { row => (
Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString),
row.getString(7)
)}
在查找方法中,您希望将 Map 作为参数
def lookup(lookupMap:Map[Array[String],String]) =
udf((input:Array[String]) => lookupMap.lift(input))
这就是您收到类型不匹配错误的原因。
首先将 srdd 从 RDD[tuple] 转换为 RDD[Map],然后尝试将 RDD 转换为 Map 以解决此错误。
val srdd = df.rdd.map { row => Map(
Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString) ->
row.getString(7)
)}