无法压缩分区数不相等的 RDD

Can't zip RDDs with unequal numbers of partitions

现在我有 3 个这样的 RDD:

rdd1:

1 2
3 4
5 6
7 8
9 10

rdd2:

11 12
13 14

rdd3:

15 16
17 18
19 20

我想这样做:

rdd1.zip(rdd2.union(rdd3))

我想要的结果是这样的:

1 2 11 12
3 4 13 14
5 6 15 16
7 8 17 18
9 10 19 20

但我有一个例外:

Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions

有人告诉我我可以毫无例外地做到这一点:

rdd1.zip(rdd2.union(rdd3).repartition(1))

不过好像有点小成本。所以我想知道有没有其他方法可以解决这个问题

我不确定您所说的 "cost" 是什么意思,但是您怀疑 repartition(1) 不是正确的解决方案是对的。它会将 RDD 重新分区到单个分区。

  • 如果您的数据不适合单台机器,这将失败。
  • 它仅在 rdd1 具有单个分区时有效。当您有更多数据时,这可能不再适用。
  • repartition 执行 shuffle,因此您的数据最终可以以不同的方式排序。

我认为正确的解决方案是放弃使用 zip,因为您可能无法确保分区匹配。创建一个密钥并使用 join 代替:

val indexedRDD1 = rdd1.zipWithIndex.map { case (v, i) => i -> v }
val indexedRDD2 = rdd2.zipWithIndex.map { case (v, i) => i -> v }
val offset = rdd2.count
val indexedRDD3 = rdd3.zipWithIndex.map { case (v, i) => (i + offset) -> v }
val combined =
  indexedRDD1.leftOuterJoin(indexedRDD2).leftOuterJoin(indexedRDD3).map {
    case (i, ((v1, v2Opt), v3Opt)) => i -> (v1, v2Opt.getOrElse(v3Opt.get))
  }

无论分区如何,这都有效。如果你喜欢,你可以对结果进行排序并删除最后的索引:

val unindexed = combined.sortByKey().values