将元组列表作为参数传递给scala中的spark udf
Passing a list of tuples as a parameter to a spark udf in scala
我正在尝试将元组列表传递给 scala 中的 udf。我不确定如何为此准确定义数据类型。我试图将它作为一整行传递,但它无法真正解决它。我需要根据元组的第一个元素对列表进行排序,然后发回 n 个元素。我已经为 udf
尝试了以下定义
def udfFilterPath = udf((id: Long, idList: Array[structType[Long, String]] )
def udfFilterPath = udf((id: Long, idList: Array[Tuple2[Long, String]] )
def udfFilterPath = udf((id: Long, idList: Row)
这是 idList 的样子:
[[1234,"Tony"], [2345, "Angela"]]
[[1234,"Tony"], [234545, "Ruby"], [353445, "Ria"]]
这是一个像上面一样有 100 行的数据框。我调用 udf 如下:
testSet.select("id", "idList").withColumn("result", udfFilterPath($"id", $"idList")).show
当我打印数据框的架构时,它会将其读取为结构数组。 idList 本身是通过对按键分组并存储在数据框中的一列元组执行收集列表生成的。关于我做错了什么的任何想法?谢谢!
定义 UDF 时,您应该使用普通的 Scala 类型(例如元组、基元...)并且 而不是 Spark SQL 类型(例如 StructType
) 作为 输出类型 。
至于 input 类型——这是它变得棘手的地方(而且没有很好的记录)——元组数组实际上是 mutable.WrappedArray[Row]
。所以 - 你必须先 "convert" 每行到一个元组中,然后你可以进行排序和 return 结果。
最后,根据您的描述,似乎根本没有使用 id
列,因此我将其从 UDF 定义中删除,但可以轻松添加回来。
val udfFilterPath = udf { idList: mutable.WrappedArray[Row] =>
// converts the array items into tuples, sorts by first item and returns first two tuples:
idList.map(r => (r.getAs[Long](0), r.getAs[String](1))).sortBy(_._1).take(2)
}
df.withColumn("result", udfFilterPath($"idList")).show(false)
+------+-------------------------------------------+----------------------------+
|id |idList |result |
+------+-------------------------------------------+----------------------------+
|1234 |[[1234,Tony], [2345,Angela]] |[[1234,Tony], [2345,Angela]]|
|234545|[[1234,Tony], [2345454,Ruby], [353445,Ria]]|[[1234,Tony], [353445,Ria]] |
+------+-------------------------------------------+----------------------------+
我正在尝试将元组列表传递给 scala 中的 udf。我不确定如何为此准确定义数据类型。我试图将它作为一整行传递,但它无法真正解决它。我需要根据元组的第一个元素对列表进行排序,然后发回 n 个元素。我已经为 udf
尝试了以下定义def udfFilterPath = udf((id: Long, idList: Array[structType[Long, String]] )
def udfFilterPath = udf((id: Long, idList: Array[Tuple2[Long, String]] )
def udfFilterPath = udf((id: Long, idList: Row)
这是 idList 的样子:
[[1234,"Tony"], [2345, "Angela"]]
[[1234,"Tony"], [234545, "Ruby"], [353445, "Ria"]]
这是一个像上面一样有 100 行的数据框。我调用 udf 如下:
testSet.select("id", "idList").withColumn("result", udfFilterPath($"id", $"idList")).show
当我打印数据框的架构时,它会将其读取为结构数组。 idList 本身是通过对按键分组并存储在数据框中的一列元组执行收集列表生成的。关于我做错了什么的任何想法?谢谢!
定义 UDF 时,您应该使用普通的 Scala 类型(例如元组、基元...)并且 而不是 Spark SQL 类型(例如 StructType
) 作为 输出类型 。
至于 input 类型——这是它变得棘手的地方(而且没有很好的记录)——元组数组实际上是 mutable.WrappedArray[Row]
。所以 - 你必须先 "convert" 每行到一个元组中,然后你可以进行排序和 return 结果。
最后,根据您的描述,似乎根本没有使用 id
列,因此我将其从 UDF 定义中删除,但可以轻松添加回来。
val udfFilterPath = udf { idList: mutable.WrappedArray[Row] =>
// converts the array items into tuples, sorts by first item and returns first two tuples:
idList.map(r => (r.getAs[Long](0), r.getAs[String](1))).sortBy(_._1).take(2)
}
df.withColumn("result", udfFilterPath($"idList")).show(false)
+------+-------------------------------------------+----------------------------+
|id |idList |result |
+------+-------------------------------------------+----------------------------+
|1234 |[[1234,Tony], [2345,Angela]] |[[1234,Tony], [2345,Angela]]|
|234545|[[1234,Tony], [2345454,Ruby], [353445,Ria]]|[[1234,Tony], [353445,Ria]] |
+------+-------------------------------------------+----------------------------+