将 Scalaz 与 Spark 结合使用时出现不可序列化异常
Not Serializable Exception when using Scalaz with Spark
我做了一个简单的示例来尝试将 scalaz library 代码与 Apache Spark 1.5 集成。
下面是一个简单的 Spark 程序来说明我的问题:
package test
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD
import ca.crim.deti.re.spark.sparkConf
import scalaz._
import scalaz.Scalaz._
object TestSpark {
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("Test").setMaster("local")
val SC = new SparkContext(conf)
val c = SC.parallelize(List(1, 2, 3, 4, 5))
println(func1(c).count) // WORKS
println(func2(c).count) // DOES NOT WORK.. NotSerializableException
}
// WORKS!
def func1(rdd: RDD[Int]) = {
rdd.filter { i => f(i, i) }
}
// DOES NOT WORK!
def func2[I: Equal](rdd: RDD[I]) = {
rdd.filter { i => f(i, i) }
}
def f[I: Equal](i1: I, i2: I) = {
i1 === i2
}
}
我想通过在函数定义中使用 Equal
使 func2
工作。
当使用 func2
在本地模式下在 Spark 环境中执行时,出现以下异常:
问题是 def func2[I: Equal](rdd: RDD[I])
的调用需要范围内的 Equal[I]
的一些实例。由于您使用的是 ScalaZ - 也许它从该库中获取实例,并且显然该实例不可序列化,如堆栈跟踪中所报告的那样。
将您自己的可序列化版本的 Equal[I] 放入作用域,这会有所帮助。
由于您的函数有一个 Equal[I]
约束,Spark 正在关闭它并在进行分发时尝试对其进行序列化。由于 scalaz.Equal
类型 class 不是 Serializable
(https://github.com/scalaz/scalaz/blob/v7.2.0/core/src/main/scala/scalaz/Equal.scala#L10),因此 Spark 在运行时失败。
您可以通过使用 Twitter 的 chill 库中的 MeatLocker
来绕过它不是 Serialziable
的事实:https://github.com/twitter/chill#the-meatlocker
或者,cats and algebra 库(有一个 Equal
类型 class 类似于你上面使用的那个)有序列化类型 classes 你应该能够像上面那样使用那些没有问题。
我做了一个简单的示例来尝试将 scalaz library 代码与 Apache Spark 1.5 集成。
下面是一个简单的 Spark 程序来说明我的问题:
package test
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD
import ca.crim.deti.re.spark.sparkConf
import scalaz._
import scalaz.Scalaz._
object TestSpark {
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("Test").setMaster("local")
val SC = new SparkContext(conf)
val c = SC.parallelize(List(1, 2, 3, 4, 5))
println(func1(c).count) // WORKS
println(func2(c).count) // DOES NOT WORK.. NotSerializableException
}
// WORKS!
def func1(rdd: RDD[Int]) = {
rdd.filter { i => f(i, i) }
}
// DOES NOT WORK!
def func2[I: Equal](rdd: RDD[I]) = {
rdd.filter { i => f(i, i) }
}
def f[I: Equal](i1: I, i2: I) = {
i1 === i2
}
}
我想通过在函数定义中使用 Equal
使 func2
工作。
当使用 func2
在本地模式下在 Spark 环境中执行时,出现以下异常:
问题是 def func2[I: Equal](rdd: RDD[I])
的调用需要范围内的 Equal[I]
的一些实例。由于您使用的是 ScalaZ - 也许它从该库中获取实例,并且显然该实例不可序列化,如堆栈跟踪中所报告的那样。
将您自己的可序列化版本的 Equal[I] 放入作用域,这会有所帮助。
由于您的函数有一个 Equal[I]
约束,Spark 正在关闭它并在进行分发时尝试对其进行序列化。由于 scalaz.Equal
类型 class 不是 Serializable
(https://github.com/scalaz/scalaz/blob/v7.2.0/core/src/main/scala/scalaz/Equal.scala#L10),因此 Spark 在运行时失败。
您可以通过使用 Twitter 的 chill 库中的 MeatLocker
来绕过它不是 Serialziable
的事实:https://github.com/twitter/chill#the-meatlocker
或者,cats and algebra 库(有一个 Equal
类型 class 类似于你上面使用的那个)有序列化类型 classes 你应该能够像上面那样使用那些没有问题。