加入两个(非)成对的 RDD 来制作一个 DataFrame
Join two (non)paired RDDs to make a DataFrame
如标题所述,假设我有两个 RDD
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([1,0,0])
或
rdd3 = sc.parallelize([("Id", 1),("Id", 2),("Id",3)])
rdd4 = sc.parallelize([("Result", 1),("Result", 0),("Result", 0)])
如何创建以下 DataFrame?
Id Result
1 1
2 0
3 0
如果我可以创建成对的 RDD [(1,1),(2,0),(3,0)] 那么 sqlCtx.createDataFrame
会给我我想要的,但我不知道怎么样?
如果有任何评论或帮助,我将不胜感激!
只要他们有相同的partitioner,每个partition的元素个数相同,就可以使用zip
函数,例如
case class Elem(id: Int, result: Int)
val df = sqlCtx.createDataFrame(rdd1.zip(rdd2).map(x => Elem(x._1, x._2)))
所以首先,有一个名为 RDD.zipWithIndex
的 RDD 操作。如果你调用 rdd2.zipWithIndex
你会得到:
scala> rdd2.zipWithIndex collect() foreach println
(1,0)
(0,1)
(0,2)
如果你想让它看起来像你的,只需这样做:
scala> rdd2.zipWithIndex map(t => (t._2 + 1,t._1)) collect() foreach println
(1,1)
(2,0)
(3,0)
如果你真的需要压缩这两个RDD,那么就用RDD.zip
scala> rdd1.zip(rdd2) collect() foreach println
(1,1)
(2,0)
(3,0)
如标题所述,假设我有两个 RDD
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([1,0,0])
或
rdd3 = sc.parallelize([("Id", 1),("Id", 2),("Id",3)])
rdd4 = sc.parallelize([("Result", 1),("Result", 0),("Result", 0)])
如何创建以下 DataFrame?
Id Result
1 1
2 0
3 0
如果我可以创建成对的 RDD [(1,1),(2,0),(3,0)] 那么 sqlCtx.createDataFrame
会给我我想要的,但我不知道怎么样?
如果有任何评论或帮助,我将不胜感激!
只要他们有相同的partitioner,每个partition的元素个数相同,就可以使用zip
函数,例如
case class Elem(id: Int, result: Int)
val df = sqlCtx.createDataFrame(rdd1.zip(rdd2).map(x => Elem(x._1, x._2)))
所以首先,有一个名为 RDD.zipWithIndex
的 RDD 操作。如果你调用 rdd2.zipWithIndex
你会得到:
scala> rdd2.zipWithIndex collect() foreach println
(1,0)
(0,1)
(0,2)
如果你想让它看起来像你的,只需这样做:
scala> rdd2.zipWithIndex map(t => (t._2 + 1,t._1)) collect() foreach println
(1,1)
(2,0)
(3,0)
如果你真的需要压缩这两个RDD,那么就用RDD.zip
scala> rdd1.zip(rdd2) collect() foreach println
(1,1)
(2,0)
(3,0)