不可序列化对象和函数的 Spark Scala 编程

Spark Scala Programming for not serializable objects and functions

当我 运行 Spark Scala 程序

时出现 "Task not serializable" 异常

我的代码是这样的

object Main{
    def main(args : Array(String){
        ...
        var rdd = sc.textFile(filename)
                  .map(line => new NotSerializableJClass(line)).cache() 
        //rdd is RDD[NotSerializableJClass]
        ...
        var test = new NotSerializableJPredicate()
        rdd = rdd.filter(elem => test.test(elem))
        //throws TaskNotSerializable on test Predicate class
    }
}

我注意到我可以用

解决第二部分
rdd = rdd.filter(elem => (new NotSerializableJPredicate()).test(elem))

但我仍然得到 RDD 中对象 class 的异常。而且我会以另一种方式也以另一种方式编写第二部分,只是因为我不想创建大量的 PredicateClass 对象。

你能帮帮我吗?我怎样才能继续使用不可序列化 class?

一些帮助我避免任务序列化问题的一般规则:

如果您正在从您的代码中调用任何 class 的方法;Spark 将需要序列化包含 method.Ways 的整个 class 可以是以下任何一种: a> 在NotSerializableClass中将方法声明为函数变量;所以不要写: def foo(x:Int)={blah blah} 尝试使用 val foo = (x:Int)=>{blah blah } 所以; spark 现在不再需要序列化整个 class。 b> 在某些情况下,重构代码以在单独的 class 中提取相关部分可能是可行的方法。 c>将class中实际上不需要的对象标记为@transient并标记class可序列化

RDD 必须是可序列化的,因此您不能创建不可序列化的 RDD class。

对于您的谓词,您可以使用 mapPartitions 编写它。

rdd.mapPartitions{
  part => 
    val test = new NotSerializableJPredicate()
    part.filter{elem => test.test(elem)}
   }

mapPartitons 将为每个分区 运行 一次,因此它允许您在执行程序上实例化不可序列化的 classes,但它只需要为每个分区而不是为每条记录执行一次.