高效的分布式算法,用于合并具有公共元素的集合

Efficient distributed algorithm to merge sets with common elements

我正在 Flink 上进行 MinHash LSH 的分布式实现,作为最后一步,我需要合并一些集群,这些集群被标识为它们之间相似的元素集。

所以我有一组分布式集合作为输入,我需要一种算法来有效地将集合与公共元素合并。鉴于 Flink 的计算模型,该算法可能是迭代的,不一定像 map-reduce。

举个例子:

{{1{1,2}},{2,{2,3}},{3,{4,5},{4{1,27}}}} 结果应该是 {1,2,3,27},{4,5} 因为集合 #1、#2 和 #4 至少有一个共同元素。

只是一个想法,可能会有更好的方法,但是这样怎么样:

  • 在映射步骤中,您为每个集合中的每个元素发出键值对,例如:element -> other elements
  • 在减少步骤中,您收集那些其他元素并丢弃重复项
  • 重复直到数据结构停止变化

第一次迭代后,您的数据将如下所示:

1 -> 2, 27
2 -> 1,3
3 -> 2
4 -> 5
5 -> 4
27 -> 1

第二个之后:

1 -> 2, 3, 27
2 -> 1, 3, 27
3 -> 1, 2
4 -> 5
5 -> 4
27 -> 1, 2

最后在第三个之后:

1 -> 2, 3, 27
2 -> 1, 3, 27
3 -> 1, 2, 27
4 -> 5
5 -> 4
27 -> 1, 2, 3

我目前无法确定更改何时停止。

要只获得每个结果的一个副本,您可以删除所有键大于任何其他元素的地方。

如果您有 N 个集合,每个集合大约有 M 个元素,如果重复很少见,则天真的方法(测试每个集合中的每个元素)是 O(N^2 * M^2)。但是,如果您实际上只有 R << N*M 个不同的元素,那还不错:一旦找到某些东西就可以停止测试,这种情况发生在 N*M 比较之后,只有 R,所以您下降到 "only" O(N*N*R)。但是,如果这些集合实际上只存在于 L 个组中,则您不必将每个集合都相互测试,因为一旦您击中正确的组,您就会停止。所以它更像是 O(N*L*R) + O(N*M)(第二项实际上是在找到要添加到的正确组后将元素添加到组中)。

如果您制作从每个元素到它包含的集合列表的映射——您可以在 O(N*M) 时间内完成——然后您可以遍历每个元素的集合树,访问每个元素不同的元素最多大约一次(即 R 个),并且对于每个访问提到它的每个集合的每个元素(结果大约是 N*M/R)并添加它的所有元素(但只有一次! ),如果您注意不要多次添加同一组,则总共需要 O(N*M) 次。 (你需要一个集合的包装器,这样你就可以知道你是否已经访问过它们。)这样会更快,但如果 L*R 非常小,你可能不在乎。

在 Scala 中,从元素到树的映射核心类似于

case class W(s: Set[Int]) { var visited: Boolean = false }
def tree(ss: Seq[Set[Int]]) = {
  var m = new collection.mutable.HashMap[Int, List[W]]
  ss.foreach{ s =>
    s.foreach{i =>
      m(i) = W(s) :: m.getOrElse(i, Nil)
    }
  }
}

遍历这些组更复杂,但基本思想是保留一张你见过的元素的地图,如果你击中了其中一​​个元素,就不会继续遍历,还要跟踪你是否已经合并元素时,通过在 W 包装器中设置标志来遍历集合。

这是一个想法:Gelly 是 Fl​​ink 的一部分,它有一个连接组件查找器。制作一个图,其中每个集合元素都有一个节点,边以最简单的方式连接每个集合的元素,例如对于 {a, b, c, d, ...} 添加 [a,b], [a,c], [a,d], [a,... 。现在找到连接的组件。他们的节点给了你要找的集合。

编辑 如果您担心从集合到图形再转换回来的性能影响(尽管这种担心是过早的优化;您应该尝试一下),重新实现 Gelly 的集合令牌推送方案就足够简单了。这是如何工作的。您的示例中已经有了标记:集合编号。让 S[i] 设置为 i 在您的示例中显示。例如。 S[1] = {1,2}。设 R 是一个逆多重图,它将每个集合元素带到它所属的集合集合中。例如。 R[2] = {1,2} 在你的例子中。令 T[i] 为集合 i 可通过传递非空交集 "links" 到达的元素。然后计算:

T[i] = S[i] for all i // with no links at all, a set reaches its own elements
loop
  for all i, Tnew[i] = \union_{ x \in T[i] } S[R[x]]  // add new reachables
  exit if Tnew == T
  T = Tnew
end loop

完成后,地图 T 的不同值就是您想要的答案。最大迭代次数应为 log |U|其中 U 是集合元素的范围。