修改spark(scala)中对象的RDD
modifying RDD of object in spark (scala)
我有:
val rdd1: RDD[myClass]
它已经初始化,我在调试时检查了所有成员都有它们的默认值
如果我这样做
rdd1.foreach(x=>x.modifier())
其中修饰符是myClass的成员函数,它修改了一些成员变量
执行此操作后,如果我检查 RDD 中的值,它们没有被修改。
有人可以解释一下这是怎么回事吗?
是否可以确保在 RDD 中修改值?
编辑:
class myClass(var id:String,var sessions: Buffer[Long],var avgsession: Long) {
def calcAvg(){
// calculate avg by summing over sessions and dividing by legnth
// Store this average in avgsession
}
}
如果我这样做,avgsession 属性不会更新
myrdd.foreach(x=>x.calcAvg())
RDD 是不可变的,对它包含的对象调用变异方法不会有任何影响。
获得你想要的结果的方法是生成MyClass
的新副本而不是修改实例:
case class MyClass(id:String, avgsession: Long) {
def modifier(a: Int):MyClass =
this.copy(avgsession = this.avgsession + a)
}
现在您仍然无法更新 rdd1,但您可以获得包含更新实例的 rdd2:
rdd2 = rdd1.map (_.modifier(18) )
我发现像您这样的代码在 spark/yarn 中 运行 时调用 RDD.persist 后可以正常工作。这可能是 unsupported/accidental 行为,您应该避免它 - 但它是一种可能在紧要关头有所帮助的解决方法。我是 运行 版本 1.5.0。
对象是不可变的。通过使用 map,您可以迭代 rdd 和 return 一个新的。
val rdd2 = rdd1.map(x=>x.modifier())
这个问题的答案比这里最初接受的答案稍微微妙一些。原始答案仅对于未缓存在内存中的数据是正确的。缓存在内存中的 RDD 数据也可以在内存中发生变异,即使 RDD 应该是不可变的,变异也会保留。考虑以下示例:
val rdd = sc.parallelize(Seq(new mutable.HashSet[Int]()))
rdd.foreach(_+=1)
rdd.collect.foreach(println)
如果你 运行 这个例子你会得到 Set()
作为结果就像原来的答案状态。
但是,如果您要 运行 与缓存调用完全相同的事情:
val rdd = sc.parallelize(Seq(new mutable.HashSet[Int]()))
rdd.cache
rdd.foreach(_+=1)
rdd.collect.foreach(println)
现在结果将打印为 Set(1)
。所以这取决于数据是否缓存在内存中。如果 spark 从源代码重新计算或从磁盘上的序列化副本读取,那么它总是会重置回原始对象并且看起来是不可变的,但如果它不是从序列化形式加载,那么突变实际上会保留。
我有:
val rdd1: RDD[myClass]
它已经初始化,我在调试时检查了所有成员都有它们的默认值
如果我这样做
rdd1.foreach(x=>x.modifier())
其中修饰符是myClass的成员函数,它修改了一些成员变量
执行此操作后,如果我检查 RDD 中的值,它们没有被修改。
有人可以解释一下这是怎么回事吗? 是否可以确保在 RDD 中修改值?
编辑:
class myClass(var id:String,var sessions: Buffer[Long],var avgsession: Long) {
def calcAvg(){
// calculate avg by summing over sessions and dividing by legnth
// Store this average in avgsession
}
}
如果我这样做,avgsession 属性不会更新
myrdd.foreach(x=>x.calcAvg())
RDD 是不可变的,对它包含的对象调用变异方法不会有任何影响。
获得你想要的结果的方法是生成MyClass
的新副本而不是修改实例:
case class MyClass(id:String, avgsession: Long) {
def modifier(a: Int):MyClass =
this.copy(avgsession = this.avgsession + a)
}
现在您仍然无法更新 rdd1,但您可以获得包含更新实例的 rdd2:
rdd2 = rdd1.map (_.modifier(18) )
我发现像您这样的代码在 spark/yarn 中 运行 时调用 RDD.persist 后可以正常工作。这可能是 unsupported/accidental 行为,您应该避免它 - 但它是一种可能在紧要关头有所帮助的解决方法。我是 运行 版本 1.5.0。
对象是不可变的。通过使用 map,您可以迭代 rdd 和 return 一个新的。
val rdd2 = rdd1.map(x=>x.modifier())
这个问题的答案比这里最初接受的答案稍微微妙一些。原始答案仅对于未缓存在内存中的数据是正确的。缓存在内存中的 RDD 数据也可以在内存中发生变异,即使 RDD 应该是不可变的,变异也会保留。考虑以下示例:
val rdd = sc.parallelize(Seq(new mutable.HashSet[Int]()))
rdd.foreach(_+=1)
rdd.collect.foreach(println)
如果你 运行 这个例子你会得到 Set()
作为结果就像原来的答案状态。
但是,如果您要 运行 与缓存调用完全相同的事情:
val rdd = sc.parallelize(Seq(new mutable.HashSet[Int]()))
rdd.cache
rdd.foreach(_+=1)
rdd.collect.foreach(println)
现在结果将打印为 Set(1)
。所以这取决于数据是否缓存在内存中。如果 spark 从源代码重新计算或从磁盘上的序列化副本读取,那么它总是会重置回原始对象并且看起来是不可变的,但如果它不是从序列化形式加载,那么突变实际上会保留。