不可序列化对象和函数的 Spark Scala 编程
Spark Scala Programming for not serializable objects and functions
当我 运行 Spark Scala 程序
时出现 "Task not serializable" 异常
- Spark RDDs 是不可序列化类型 (java class)
- 调用的函数来自不可序列化的 class(java class,再次)
我的代码是这样的
object Main{
def main(args : Array(String){
...
var rdd = sc.textFile(filename)
.map(line => new NotSerializableJClass(line)).cache()
//rdd is RDD[NotSerializableJClass]
...
var test = new NotSerializableJPredicate()
rdd = rdd.filter(elem => test.test(elem))
//throws TaskNotSerializable on test Predicate class
}
}
我注意到我可以用
解决第二部分
rdd = rdd.filter(elem => (new NotSerializableJPredicate()).test(elem))
但我仍然得到 RDD 中对象 class 的异常。而且我会以另一种方式也以另一种方式编写第二部分,只是因为我不想创建大量的 PredicateClass 对象。
你能帮帮我吗?我怎样才能继续使用不可序列化 class?
一些帮助我避免任务序列化问题的一般规则:
如果您正在从您的代码中调用任何 class 的方法;Spark 将需要序列化包含 method.Ways 的整个 class 可以是以下任何一种:
a> 在NotSerializableClass中将方法声明为函数变量;所以不要写:
def foo(x:Int)={blah blah} 尝试使用 val foo = (x:Int)=>{blah blah }
所以; spark 现在不再需要序列化整个 class。
b> 在某些情况下,重构代码以在单独的 class 中提取相关部分可能是可行的方法。
c>将class中实际上不需要的对象标记为@transient并标记class可序列化
RDD 必须是可序列化的,因此您不能创建不可序列化的 RDD class。
对于您的谓词,您可以使用 mapPartitions 编写它。
rdd.mapPartitions{
part =>
val test = new NotSerializableJPredicate()
part.filter{elem => test.test(elem)}
}
mapPartitons 将为每个分区 运行 一次,因此它允许您在执行程序上实例化不可序列化的 classes,但它只需要为每个分区而不是为每条记录执行一次.
当我 运行 Spark Scala 程序
时出现 "Task not serializable" 异常- Spark RDDs 是不可序列化类型 (java class)
- 调用的函数来自不可序列化的 class(java class,再次)
我的代码是这样的
object Main{
def main(args : Array(String){
...
var rdd = sc.textFile(filename)
.map(line => new NotSerializableJClass(line)).cache()
//rdd is RDD[NotSerializableJClass]
...
var test = new NotSerializableJPredicate()
rdd = rdd.filter(elem => test.test(elem))
//throws TaskNotSerializable on test Predicate class
}
}
我注意到我可以用
解决第二部分rdd = rdd.filter(elem => (new NotSerializableJPredicate()).test(elem))
但我仍然得到 RDD 中对象 class 的异常。而且我会以另一种方式也以另一种方式编写第二部分,只是因为我不想创建大量的 PredicateClass 对象。
你能帮帮我吗?我怎样才能继续使用不可序列化 class?
一些帮助我避免任务序列化问题的一般规则:
如果您正在从您的代码中调用任何 class 的方法;Spark 将需要序列化包含 method.Ways 的整个 class 可以是以下任何一种: a> 在NotSerializableClass中将方法声明为函数变量;所以不要写: def foo(x:Int)={blah blah} 尝试使用 val foo = (x:Int)=>{blah blah } 所以; spark 现在不再需要序列化整个 class。 b> 在某些情况下,重构代码以在单独的 class 中提取相关部分可能是可行的方法。 c>将class中实际上不需要的对象标记为@transient并标记class可序列化
RDD 必须是可序列化的,因此您不能创建不可序列化的 RDD class。
对于您的谓词,您可以使用 mapPartitions 编写它。
rdd.mapPartitions{
part =>
val test = new NotSerializableJPredicate()
part.filter{elem => test.test(elem)}
}
mapPartitons 将为每个分区 运行 一次,因此它允许您在执行程序上实例化不可序列化的 classes,但它只需要为每个分区而不是为每条记录执行一次.