Spark zipPartitions 在同一个 RDD 上
Spark zipPartitions on the same RDD
我是 Spark 的新手,在执行 cartesian
之类的操作时遇到了一些问题,但仅限于同一分区。也许一个例子可以清楚地说明我想做什么:假设我们有一个用 sc.parallelize(1,2,3,4,5,6)
制作的 RDD,这个 RDD 分为三个分区,分别包含: (1,2)
; (3,4)
; (5,6)
。比我想获得以下结果: ((1,1),(1,2),(2,1),(2,2))
; ((3,3),(3,4),(4,3),(4,4))
; ((5,5),(5,6),(6,5),(6,6))
.
到目前为止我尝试过的是:
partitionedData.zipPartitions(partitionedData)((aiter, biter) => {
var res = new ListBuffer[(Double,Double)]()
while(aiter.hasNext){
val a = aiter.next()
while(biter.hasNext){
val b = biter.next()
res+=(a,b)
}
}
res.iterator
})
但它不起作用,因为 aiter
和 biter
是同一个迭代器...所以我只得到结果的第一行。
有人可以帮助我吗?
谢谢。
使用RDD.mapPartitions
:
val rdd = sc.parallelize(1 to 6, 3)
val res = rdd.mapPartitions { iter =>
val seq = iter.toSeq
val res = for (a <- seq; b <- seq) yield (a, b)
res.iterator
}
res.collect
打印:
res0: Array[(Int, Int)] = Array((1,1), (1,2), (2,1), (2,2), (3,3), (3,4), (4,3), (4,4), (5,5), (5,6), (6,5), (6,6))
我是 Spark 的新手,在执行 cartesian
之类的操作时遇到了一些问题,但仅限于同一分区。也许一个例子可以清楚地说明我想做什么:假设我们有一个用 sc.parallelize(1,2,3,4,5,6)
制作的 RDD,这个 RDD 分为三个分区,分别包含: (1,2)
; (3,4)
; (5,6)
。比我想获得以下结果: ((1,1),(1,2),(2,1),(2,2))
; ((3,3),(3,4),(4,3),(4,4))
; ((5,5),(5,6),(6,5),(6,6))
.
到目前为止我尝试过的是:
partitionedData.zipPartitions(partitionedData)((aiter, biter) => {
var res = new ListBuffer[(Double,Double)]()
while(aiter.hasNext){
val a = aiter.next()
while(biter.hasNext){
val b = biter.next()
res+=(a,b)
}
}
res.iterator
})
但它不起作用,因为 aiter
和 biter
是同一个迭代器...所以我只得到结果的第一行。
有人可以帮助我吗?
谢谢。
使用RDD.mapPartitions
:
val rdd = sc.parallelize(1 to 6, 3)
val res = rdd.mapPartitions { iter =>
val seq = iter.toSeq
val res = for (a <- seq; b <- seq) yield (a, b)
res.iterator
}
res.collect
打印:
res0: Array[(Int, Int)] = Array((1,1), (1,2), (2,1), (2,2), (3,3), (3,4), (4,3), (4,4), (5,5), (5,6), (6,5), (6,6))