Class 在 Apache Zeppelin 中应用 udf 时未找到
Class not found when applying udf in Apache Zeppelin
后期编辑: 看来问题与 Apache Zeppelin 解释器有关。我在 Spark 1.6.0 上使用 Apache Zeppelin 0.6.0。当 运行在 spark-shell (2.0.0) 中使用相同的代码时没有问题。
这可能有点太具体了,但也许它可以帮助其他遇到类似 UDF 错误的人。
我想要的是基于该 DF 中的不同列和字符串序列在 Spark Dataframe 中创建一个列。
因此,如果列 "location" 中的值在序列 "cities" 中,则创建列 "urban" 并放置 1 否则放置 0.
尝试用几种不同的方法解决它。我犯了同样的错误。最终版本基于这些帖子:
Use of Seq.contains(String)
和
。这就是我现在拥有的:
val cities = Seq("london", "paris")
df.filter(lower($"location") isin (cities : _*)).count()
长 = 5485947
所以我有这两个位置的记录
import org.apache.spark.sql.functions._
val urbanFlag: (String => Int) = (arg: String) => {if (cities.contains(arg)) 1 else 0}
val urbf = udf(urbanFlag)
df.withColumn("urban", urbf(lower($"location"))).show(100)
当我 运行 得到 "Job aborted due to stage failure" 时,错误:
java.lang.ClassNotFoundException:
$iwC$$iwC$$iwC$$iwC$$iwC$$$5d9ae18728ec9520b65ad133e3b55$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun
...和一个巨大的堆栈跟踪。
我猜想有一些关于匿名函数的东西,但是什么?
也许您定义 UDF 的方式有问题?这对我有用:
import org.apache.spark.sql.functions._
val data = sqlContext.read.json(sc.parallelize(Seq("{'location' : 'london'}", "{'location': 'tokyo'}")))
val cities = Seq("london", "paris")
val urbf = udf { city: String => if (cities.contains(city)) 1 else 0 }
data.select($"location", urbf($"location")).show
+--------+-------------+
|location|UDF(location)|
+--------+-------------+
| london| 1|
| tokyo| 0|
+--------+-------------+
请注意,我是直接定义 UDF,即没有中间层。
后期编辑: 看来问题与 Apache Zeppelin 解释器有关。我在 Spark 1.6.0 上使用 Apache Zeppelin 0.6.0。当 运行在 spark-shell (2.0.0) 中使用相同的代码时没有问题。
这可能有点太具体了,但也许它可以帮助其他遇到类似 UDF 错误的人。
我想要的是基于该 DF 中的不同列和字符串序列在 Spark Dataframe 中创建一个列。 因此,如果列 "location" 中的值在序列 "cities" 中,则创建列 "urban" 并放置 1 否则放置 0.
尝试用几种不同的方法解决它。我犯了同样的错误。最终版本基于这些帖子:
Use of Seq.contains(String)
和
val cities = Seq("london", "paris")
df.filter(lower($"location") isin (cities : _*)).count()
长 = 5485947 所以我有这两个位置的记录
import org.apache.spark.sql.functions._
val urbanFlag: (String => Int) = (arg: String) => {if (cities.contains(arg)) 1 else 0}
val urbf = udf(urbanFlag)
df.withColumn("urban", urbf(lower($"location"))).show(100)
当我 运行 得到 "Job aborted due to stage failure" 时,错误:
java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$$5d9ae18728ec9520b65ad133e3b55$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun
...和一个巨大的堆栈跟踪。 我猜想有一些关于匿名函数的东西,但是什么?
也许您定义 UDF 的方式有问题?这对我有用:
import org.apache.spark.sql.functions._
val data = sqlContext.read.json(sc.parallelize(Seq("{'location' : 'london'}", "{'location': 'tokyo'}")))
val cities = Seq("london", "paris")
val urbf = udf { city: String => if (cities.contains(city)) 1 else 0 }
data.select($"location", urbf($"location")).show
+--------+-------------+
|location|UDF(location)|
+--------+-------------+
| london| 1|
| tokyo| 0|
+--------+-------------+
请注意,我是直接定义 UDF,即没有中间层。