Class 在 Apache Zeppelin 中应用 udf 时未找到

Class not found when applying udf in Apache Zeppelin

后期编辑: 看来问题与 Apache Zeppelin 解释器有关。我在 Spark 1.6.0 上使用 Apache Zeppelin 0.6.0。当 运行在 spark-shell (2.0.0) 中使用相同的代码时没有问题。

这可能有点太具体了,但也许它可以帮助其他遇到类似 UDF 错误的人。

我想要的是基于该 DF 中的不同列和字符串序列在 Spark Dataframe 中创建一个列。 因此,如果列 "location" 中的值在序列 "cities" 中,则创建列 "urban" 并放置 1 否则放置 0.

尝试用几种不同的方法解决它。我犯了同样的错误。最终版本基于这些帖子: Use of Seq.contains(String)。这就是我现在拥有的:

val cities = Seq("london", "paris")
df.filter(lower($"location") isin (cities : _*)).count()

长 = 5485947 所以我有这两个位置的记录

import org.apache.spark.sql.functions._
val urbanFlag: (String => Int) = (arg: String) => {if (cities.contains(arg)) 1 else 0}
val urbf = udf(urbanFlag)
df.withColumn("urban", urbf(lower($"location"))).show(100)

当我 运行 得到 "Job aborted due to stage failure" 时,错误:

java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$$5d9ae18728ec9520b65ad133e3b55$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun

...和一个巨大的堆栈跟踪。 我猜想有一些关于匿名函数的东西,但是什么?

也许您定义 UDF 的方式有问题?这对我有用:

import org.apache.spark.sql.functions._

val data = sqlContext.read.json(sc.parallelize(Seq("{'location' : 'london'}", "{'location': 'tokyo'}")))

val cities = Seq("london", "paris")
val urbf = udf { city: String => if (cities.contains(city)) 1 else 0 }

data.select($"location", urbf($"location")).show

+--------+-------------+
|location|UDF(location)|
+--------+-------------+
|  london|            1|
|   tokyo|            0|
+--------+-------------+

请注意,我是直接定义 UDF,即没有中间层。