Apache Spark RDD - 不更新
Apache Spark RDD - not updating
我创建了一个包含 Vector 的 PairRDD。
var newRDD = oldRDD.mapValues(listOfItemsAndRatings => Vector(Array.fill(2){math.random}))
稍后我会更新 RDD:
newRDD.lookup(ratingObject.user)(0) += 0.2 * (errorRate(rating) * myVector)
然而,尽管它输出更新的 Vector(如控制台中所示),但当我下次调用 newRDD
时,我可以看到 Vector 值已更改。通过测试我得出结论,它已更改为 math.random
给出的内容 - 因为每次我调用 newRDD
Vector 都会更改。我知道有一个谱系图,也许与它有关。我需要将 RDD 中保存的 Vector 更新为新值,我需要重复执行此操作。
谢谢。
RDD 是不可变结构,旨在通过集群分布对数据的操作。
有两个因素在您在这里观察到的行为中发挥作用:
每次都可以计算RDD谱系。在这种情况下,这意味着对 newRDD 的操作可能会触发沿袭计算,因此每次应用 Vector(Array.fill(2){math.random})
转换并产生新值。可以使用 cache
打破沿袭,在这种情况下,转换的值将在第一次应用后保存在内存 and/or 磁盘中。
这导致:
val randomVectorRDD = oldRDD.mapValues(listOfItemsAndRatings => Vector(Array.fill(2){math.random}))
randomVectorRDD.cache()
第二个需要进一步考虑的方面是现场突变:
newRDD.lookup(ratingObject.user)(0) += 0.2 * (errorRate(rating) * myVector)
虽然这可能适用于单台机器,因为所有 Vector 引用都是本地的,但它不会扩展到集群,因为查找引用将被序列化并且不会保留突变。因此它带有为什么要使用Spark的问题。
要在 Spark 上实现,该算法将需要重新设计,以便用转换而不是准时的方式表达 lookup/mutations。
我创建了一个包含 Vector 的 PairRDD。
var newRDD = oldRDD.mapValues(listOfItemsAndRatings => Vector(Array.fill(2){math.random}))
稍后我会更新 RDD:
newRDD.lookup(ratingObject.user)(0) += 0.2 * (errorRate(rating) * myVector)
然而,尽管它输出更新的 Vector(如控制台中所示),但当我下次调用 newRDD
时,我可以看到 Vector 值已更改。通过测试我得出结论,它已更改为 math.random
给出的内容 - 因为每次我调用 newRDD
Vector 都会更改。我知道有一个谱系图,也许与它有关。我需要将 RDD 中保存的 Vector 更新为新值,我需要重复执行此操作。
谢谢。
RDD 是不可变结构,旨在通过集群分布对数据的操作。 有两个因素在您在这里观察到的行为中发挥作用:
每次都可以计算RDD谱系。在这种情况下,这意味着对 newRDD 的操作可能会触发沿袭计算,因此每次应用 Vector(Array.fill(2){math.random})
转换并产生新值。可以使用 cache
打破沿袭,在这种情况下,转换的值将在第一次应用后保存在内存 and/or 磁盘中。
这导致:
val randomVectorRDD = oldRDD.mapValues(listOfItemsAndRatings => Vector(Array.fill(2){math.random}))
randomVectorRDD.cache()
第二个需要进一步考虑的方面是现场突变:
newRDD.lookup(ratingObject.user)(0) += 0.2 * (errorRate(rating) * myVector)
虽然这可能适用于单台机器,因为所有 Vector 引用都是本地的,但它不会扩展到集群,因为查找引用将被序列化并且不会保留突变。因此它带有为什么要使用Spark的问题。
要在 Spark 上实现,该算法将需要重新设计,以便用转换而不是准时的方式表达 lookup/mutations。