RDD 中的重复行
duplicated rows in RDD
我在spark中遇到了以下问题:
...
while(...){
key = filtersIterator.next()
pricesOverLeadTimesKeyFiltered = pricesOverLeadTimesFilteredMap_cached
.filter(x => x._1.equals(key))
.values
resultSRB = processLinearBreakevens(pricesOverLeadTimesKeyFiltered)
resultsSRB = resultsSRB.union(resultSRB)
}
....
通过这种方式,我在resultsSRB中积累了相同的resultSRB。
但是这里有 "some" 技巧,允许我为每次迭代添加 different/right resultSRB
- 在每次
processLinearBreakevens(..)
调用后调用 resultSRB.collect()
或 resultSRB.foreach(println)
或 println(resultSRB.count)
- 在
processLinearBreakevens(..)
开头对pricesOverLeadTimesKeyFiltered
执行相同的操作
看来我需要确保所有的操作必须是"flushed"才能执行并集。我已经尝试通过一个临时变量合并,或者坚持 resultSRB,或者坚持 pricesOverLeadTimesKeyFiltered
但仍然是同样的问题。
你能帮帮我吗?
迈克尔
如果我的假设是正确的;所有这些都是var
,那么问题就出在闭包上。 key
需要是 val
,因为它被懒惰地捕获到您的 filter
中。所以,当它最终被执行时,过滤总是使用 key
的最后状态
我的例子:
def process(filtered : RDD[Int]) = filtered.map(x=> x+1)
var i = 1
var key = 1
var filtered = sc.parallelize(List[Int]())
var result = sc.parallelize(List[Int]())
var results = sc.parallelize(List[Int]())
val cached = sc.parallelize(1 to 1000).map(x=>(x, x)).persist
while(i <= 3){
key = i * 10
val filtered = cached
.filter(x => x._1.equals(key))
.values
val result = process(filtered)
results = results.union(result)
i = i + 1
}
results.collect
//expect 11,21,31 but get 31, 31, 31
要修复它,请在 while
循环中将 key
更改为 val
,然后将得到您预期的 11,21,31
我在spark中遇到了以下问题:
...
while(...){
key = filtersIterator.next()
pricesOverLeadTimesKeyFiltered = pricesOverLeadTimesFilteredMap_cached
.filter(x => x._1.equals(key))
.values
resultSRB = processLinearBreakevens(pricesOverLeadTimesKeyFiltered)
resultsSRB = resultsSRB.union(resultSRB)
}
....
通过这种方式,我在resultsSRB中积累了相同的resultSRB。 但是这里有 "some" 技巧,允许我为每次迭代添加 different/right resultSRB
- 在每次
processLinearBreakevens(..)
调用后调用resultSRB.collect()
或resultSRB.foreach(println)
或println(resultSRB.count)
- 在
processLinearBreakevens(..)
开头对
pricesOverLeadTimesKeyFiltered
执行相同的操作
看来我需要确保所有的操作必须是"flushed"才能执行并集。我已经尝试通过一个临时变量合并,或者坚持 resultSRB,或者坚持 pricesOverLeadTimesKeyFiltered
但仍然是同样的问题。
你能帮帮我吗? 迈克尔
如果我的假设是正确的;所有这些都是var
,那么问题就出在闭包上。 key
需要是 val
,因为它被懒惰地捕获到您的 filter
中。所以,当它最终被执行时,过滤总是使用 key
我的例子:
def process(filtered : RDD[Int]) = filtered.map(x=> x+1)
var i = 1
var key = 1
var filtered = sc.parallelize(List[Int]())
var result = sc.parallelize(List[Int]())
var results = sc.parallelize(List[Int]())
val cached = sc.parallelize(1 to 1000).map(x=>(x, x)).persist
while(i <= 3){
key = i * 10
val filtered = cached
.filter(x => x._1.equals(key))
.values
val result = process(filtered)
results = results.union(result)
i = i + 1
}
results.collect
//expect 11,21,31 but get 31, 31, 31
要修复它,请在 while
循环中将 key
更改为 val
,然后将得到您预期的 11,21,31