Spark:如何有效地保留重复项(在 Scala 中)?

Spark: How to efficiently have intersections preserving duplicates (in Scala)?

我有2个RDD,每一个都是一组包含重复项的字符串。我想找到两个集合的交集 保留重复项 。示例:

RDD1 : a, b, b, c, c, c, c

RDD2 : a, a, b, c, c

我想要的交集是集合 a, b, c, c 即交集包含每个元素的最少次数,它在两个集合中都存在。

默认的 intersection 转换不保留重复项 AFAIK。有没有办法有效地使用一些其他变换and/or计算交集变换?我试图避免通过算法进行操作,这不太可能像 Spark 方式那样高效。 (对于感兴趣的人,我正在尝试为一组文件计算 Jaccard bag similarity)。

借鉴 intersection 的实现,你可以做如下事情:

(val rdd1 = sc.parallelize(Seq("a", "b", "b", "c", "c", "c", "c")))
(val rdd2 = sc.parallelize(Seq("a", "a", "b", "c", "c")))

val cogrouped = rdd1.map(k => (k, null)).cogroup(rdd2.map(k => (k, null)))
val groupSize = cogrouped.map { case (key, (buf1, buf2)) => (key, math.min(buf1.size, buf2.size)) }
val finalSet = groupSize.flatMap { case (key, size) => List.fill(size)(key) }

(finalSet.collect = Array(a, b, c, c))

这是可行的,因为 cogroup 将为每个分组保留一对值的重复出现(在这种情况下,所有空值)。另请注意,与最初使用 intersection.

相比,我们在这里没有做更多的随机播放