修改spark(scala)中对象的RDD

modifying RDD of object in spark (scala)

我有:

val rdd1: RDD[myClass]

它已经初始化,我在调试时检查了所有成员都有它们的默认值

如果我这样做

rdd1.foreach(x=>x.modifier())

其中修饰符是myClass的成员函数,它修改了一些成员变量

执行此操作后,如果我检查 RDD 中的值,它们没有被修改。

有人可以解释一下这是怎么回事吗? 是否可以确保在 RDD 中修改值?

编辑:

class myClass(var id:String,var sessions: Buffer[Long],var avgsession: Long)  {
    def calcAvg(){
   // calculate avg by summing over sessions and dividing by legnth
   // Store this average in avgsession
    }
}

如果我这样做,avgsession 属性不会更新

myrdd.foreach(x=>x.calcAvg())

RDD 是不可变的,对它包含的对象调用变异方法不会有任何影响。

获得你想要的结果的方法是生成MyClass的新副本而不是修改实例:

case class MyClass(id:String, avgsession: Long) {
    def modifier(a: Int):MyClass = 
       this.copy(avgsession = this.avgsession + a) 
}

现在您仍然无法更新 rdd1,但您可以获得包含更新实例的 rdd2:

rdd2 = rdd1.map (_.modifier(18) ) 

我发现像您这样的代码在 spark/yarn 中 运行 时调用 RDD.persist 后可以正常工作。这可能是 unsupported/accidental 行为,您应该避免它 - 但它是一种可能在紧要关头有所帮助的解决方法。我是 运行 版本 1.5.0。

对象是不可变的。通过使用 map,您可以迭代 rdd 和 return 一个新的。

val rdd2 = rdd1.map(x=>x.modifier())

这个问题的答案比这里最初接受的答案稍微微妙一些。原始答案仅对于未缓存在内存中的数据是正确的。缓存在内存中的 RDD 数据也可以在内存中发生变异,即使 RDD 应该是不可变的,变异也会保留。考虑以下示例:

val rdd = sc.parallelize(Seq(new mutable.HashSet[Int]()))
rdd.foreach(_+=1)
rdd.collect.foreach(println)

如果你 运行 这个例子你会得到 Set() 作为结果就像原来的答案状态。

但是,如果您要 运行 与缓存调用完全相同的事情:

val rdd = sc.parallelize(Seq(new mutable.HashSet[Int]()))
rdd.cache
rdd.foreach(_+=1)
rdd.collect.foreach(println)

现在结果将打印为 Set(1)。所以这取决于数据是否缓存在内存中。如果 spark 从源代码重新计算或从磁盘上的序列化副本读取,那么它总是会重置回原始对象并且看起来是不可变的,但如果它不是从序列化形式加载,那么突变实际上会保留。