如何在 udf 和 return 多变量值中循环数组
how do I loop a array in udf and return multiple variable value
我刚接触 scala 和 udf,现在我想编写一个 udf,它接受来自数据框列的 3 个参数(其中一个是数组),for..loop 当前数组,解析和 return 一个 case class 后面会用到。这是我的大致代码:
case class NewFeatures(dd: Boolean, zz: String)
val resultUdf = udf((arrays: Option[Row], jsonData: String, placement: Int) => {
for (item <- arrays) {
val aa = item.getAs[Long]("aa")
val bb = item.getAs[Long]("bb")
breakable {
if (aa <= 0 || bb <= 0) break
}
val cc = item.getAs[Long]("cc")
val dd = cc > 0
val jsonData = item.getAs[String]("json_data")
val jsonDataObject = JSON.parseFull(jsonData).asInstanceOf[Map[String, Any]]
var zz = jsonDataObject.getOrElse("zz", "").toString
NewFeatures(dd, zz)
}
})
当我运行它时,它会得到异常:
java.lang.UnsupportedOperationException: Schema for type Unit is not supported
上面udf应该怎么修改
首先,尝试为您的变量更好地命名,例如在您的情况下,“arrays”的类型为 Option[Row]
。在这里,for (item <- arrays) {...}
基本上是一个 .map
方法,在 Options 上使用 map,你应该提供一个函数,它使用 Row 和 returns 某种类型的值(~= 签名:def map[V](f: Row => V): Option[V]
,你想要什么:def map(f: Row => NewFeatures): Option[NewFeature]
)。当您在某些情况下打破此映射时,编译器无法保证映射方法中的函数始终 return NewFeatures 的实例。所以它是 Unit(在某些情况下它只有 returns,而不是全部)。
你想做的事情可以通过类似这样的方式得到增强:
val funcName: (Option[Row], String, Int) => Option[NewFeatures] =
(rowOpt, jsonData, placement) => rowOpt.filter(
/* your break condition */
).map { row => // if passes the filter predicate =>
// fetch data from row, create new instance
}
我刚接触 scala 和 udf,现在我想编写一个 udf,它接受来自数据框列的 3 个参数(其中一个是数组),for..loop 当前数组,解析和 return 一个 case class 后面会用到。这是我的大致代码:
case class NewFeatures(dd: Boolean, zz: String)
val resultUdf = udf((arrays: Option[Row], jsonData: String, placement: Int) => {
for (item <- arrays) {
val aa = item.getAs[Long]("aa")
val bb = item.getAs[Long]("bb")
breakable {
if (aa <= 0 || bb <= 0) break
}
val cc = item.getAs[Long]("cc")
val dd = cc > 0
val jsonData = item.getAs[String]("json_data")
val jsonDataObject = JSON.parseFull(jsonData).asInstanceOf[Map[String, Any]]
var zz = jsonDataObject.getOrElse("zz", "").toString
NewFeatures(dd, zz)
}
})
当我运行它时,它会得到异常:
java.lang.UnsupportedOperationException: Schema for type Unit is not supported
上面udf应该怎么修改
首先,尝试为您的变量更好地命名,例如在您的情况下,“arrays”的类型为 Option[Row]
。在这里,for (item <- arrays) {...}
基本上是一个 .map
方法,在 Options 上使用 map,你应该提供一个函数,它使用 Row 和 returns 某种类型的值(~= 签名:def map[V](f: Row => V): Option[V]
,你想要什么:def map(f: Row => NewFeatures): Option[NewFeature]
)。当您在某些情况下打破此映射时,编译器无法保证映射方法中的函数始终 return NewFeatures 的实例。所以它是 Unit(在某些情况下它只有 returns,而不是全部)。
你想做的事情可以通过类似这样的方式得到增强:
val funcName: (Option[Row], String, Int) => Option[NewFeatures] =
(rowOpt, jsonData, placement) => rowOpt.filter(
/* your break condition */
).map { row => // if passes the filter predicate =>
// fetch data from row, create new instance
}