RDD 中的重复行

duplicated rows in RDD

我在spark中遇到了以下问题:

...
while(...){
    key = filtersIterator.next()
    pricesOverLeadTimesKeyFiltered = pricesOverLeadTimesFilteredMap_cached
       .filter(x => x._1.equals(key))
       .values
    resultSRB = processLinearBreakevens(pricesOverLeadTimesKeyFiltered)
    resultsSRB = resultsSRB.union(resultSRB)
}
....

通过这种方式,我在resultsSRB中积累了相同的resultSRB。 但是这里有 "some" 技巧,允许我为每次迭代添加 different/right resultSRB

看来我需要确保所有的操作必须是"flushed"才能执行并集。我已经尝试通过一个临时变量合并,或者坚持 resultSRB,或者坚持 pricesOverLeadTimesKeyFiltered 但仍然是同样的问题。

你能帮帮我吗? 迈克尔

如果我的假设是正确的;所有这些都是var,那么问题就出在闭包上。 key 需要是 val,因为它被懒惰地捕获到您的 filter 中。所以,当它最终被执行时,过滤总是使用 key

的最后状态

我的例子:

def process(filtered : RDD[Int]) = filtered.map(x=> x+1)

var i = 1
var key  = 1
var filtered = sc.parallelize(List[Int]())
var result = sc.parallelize(List[Int]())
var results = sc.parallelize(List[Int]())
val cached = sc.parallelize(1 to 1000).map(x=>(x, x)).persist
while(i <= 3){
    key = i * 10
    val filtered = cached
       .filter(x => x._1.equals(key))
       .values
    val result = process(filtered)
    results = results.union(result)
    i = i + 1
}
results.collect
//expect 11,21,31 but get 31, 31, 31

要修复它,请在 while 循环中将 key 更改为 val,然后将得到您预期的 11,21,31