无法压缩分区数不相等的 RDD。我可以使用什么来替代 zip?
Can't Zip RDDs with unequal number of partitions. What can I use as an alternative to zip?
我有三个相同大小的 RDD rdd1
包含一个字符串标识符,rdd2
包含一个向量,rdd3
包含一个整数值。
本质上我想将这三个压缩在一起以获得 RDD[String,Vector,Int]
的 RDD,但我不断地得到 can't zip RDDs with unequal number of partitions。我怎样才能完全绕过 zip 来做上面提到的事情?
尝试:
rdd1.zipWithIndex.map(_.swap).join(rdd2.zipWithIndex.map(_.swap)).values
它们的元素个数都一样吗? zipPartitions
用于在 RDD 具有 正好 相同数量的分区和 完全 相同数量的元素的特殊情况下加入 RDD在每个分区中。
你的情况没有这样的保证。如果 rdd3
实际上是空的,你想做什么?你应该得到一个没有元素的结果 RDD 吗?
编辑:如果您知道长度完全相同,LostInOverflow 的答案就可以了。
在拆分原始 RDD 之前,使用 RDD.zipWithUniqueId
为每一行分配一个唯一的 ID。然后确保在您从原始文件中吐出的每个 RDD 中包含 id 字段,并将它们用作这些行的键(如果 id 还不是键,则使用 keyBy
)然后使用 RDD.join
重新排列行。
示例可能如下所示:
val rddWithKey = origionalRdd.zipWithUniqueID().map(_.swap)
val rdd1 = rddWithKey.map{case (key,value) => key -> value.stringField }
val rdd2 = rddWithKey.map{case (key,value) => key -> value.intField }
/*transformations on rdd1 and 2*/
val 重组 = rdd1.join(rdd2)
我有三个相同大小的 RDD rdd1
包含一个字符串标识符,rdd2
包含一个向量,rdd3
包含一个整数值。
本质上我想将这三个压缩在一起以获得 RDD[String,Vector,Int]
的 RDD,但我不断地得到 can't zip RDDs with unequal number of partitions。我怎样才能完全绕过 zip 来做上面提到的事情?
尝试:
rdd1.zipWithIndex.map(_.swap).join(rdd2.zipWithIndex.map(_.swap)).values
它们的元素个数都一样吗? zipPartitions
用于在 RDD 具有 正好 相同数量的分区和 完全 相同数量的元素的特殊情况下加入 RDD在每个分区中。
你的情况没有这样的保证。如果 rdd3
实际上是空的,你想做什么?你应该得到一个没有元素的结果 RDD 吗?
编辑:如果您知道长度完全相同,LostInOverflow 的答案就可以了。
在拆分原始 RDD 之前,使用 RDD.zipWithUniqueId
为每一行分配一个唯一的 ID。然后确保在您从原始文件中吐出的每个 RDD 中包含 id 字段,并将它们用作这些行的键(如果 id 还不是键,则使用 keyBy
)然后使用 RDD.join
重新排列行。
示例可能如下所示:
val rddWithKey = origionalRdd.zipWithUniqueID().map(_.swap)
val rdd1 = rddWithKey.map{case (key,value) => key -> value.stringField }
val rdd2 = rddWithKey.map{case (key,value) => key -> value.intField }
/*transformations on rdd1 and 2*/
val 重组 = rdd1.join(rdd2)