高效的分布式算法,用于合并具有公共元素的集合
Efficient distributed algorithm to merge sets with common elements
我正在 Flink 上进行 MinHash LSH 的分布式实现,作为最后一步,我需要合并一些集群,这些集群被标识为它们之间相似的元素集。
所以我有一组分布式集合作为输入,我需要一种算法来有效地将集合与公共元素合并。鉴于 Flink 的计算模型,该算法可能是迭代的,不一定像 map-reduce。
举个例子:
从 {{1{1,2}},{2,{2,3}},{3,{4,5},{4{1,27}}}}
结果应该是 {1,2,3,27},{4,5}
因为集合 #1、#2 和 #4 至少有一个共同元素。
只是一个想法,可能会有更好的方法,但是这样怎么样:
- 在映射步骤中,您为每个集合中的每个元素发出键值对,例如:
element -> other elements
- 在减少步骤中,您收集那些其他元素并丢弃重复项
- 重复直到数据结构停止变化
第一次迭代后,您的数据将如下所示:
1 -> 2, 27
2 -> 1,3
3 -> 2
4 -> 5
5 -> 4
27 -> 1
第二个之后:
1 -> 2, 3, 27
2 -> 1, 3, 27
3 -> 1, 2
4 -> 5
5 -> 4
27 -> 1, 2
最后在第三个之后:
1 -> 2, 3, 27
2 -> 1, 3, 27
3 -> 1, 2, 27
4 -> 5
5 -> 4
27 -> 1, 2, 3
我目前无法确定更改何时停止。
要只获得每个结果的一个副本,您可以删除所有键大于任何其他元素的地方。
如果您有 N
个集合,每个集合大约有 M
个元素,如果重复很少见,则天真的方法(测试每个集合中的每个元素)是 O(N^2 * M^2)
。但是,如果您实际上只有 R << N*M
个不同的元素,那还不错:一旦找到某些东西就可以停止测试,这种情况发生在 N*M
比较之后,只有 R
,所以您下降到 "only" O(N*N*R)
。但是,如果这些集合实际上只存在于 L
个组中,则您不必将每个集合都相互测试,因为一旦您击中正确的组,您就会停止。所以它更像是 O(N*L*R) + O(N*M)
(第二项实际上是在找到要添加到的正确组后将元素添加到组中)。
如果您制作从每个元素到它包含的集合列表的映射——您可以在 O(N*M)
时间内完成——然后您可以遍历每个元素的集合树,访问每个元素不同的元素最多大约一次(即 R
个),并且对于每个访问提到它的每个集合的每个元素(结果大约是 N*M/R
)并添加它的所有元素(但只有一次! ),如果您注意不要多次添加同一组,则总共需要 O(N*M)
次。 (你需要一个集合的包装器,这样你就可以知道你是否已经访问过它们。)这样会更快,但如果 L*R
非常小,你可能不在乎。
在 Scala 中,从元素到树的映射核心类似于
case class W(s: Set[Int]) { var visited: Boolean = false }
def tree(ss: Seq[Set[Int]]) = {
var m = new collection.mutable.HashMap[Int, List[W]]
ss.foreach{ s =>
s.foreach{i =>
m(i) = W(s) :: m.getOrElse(i, Nil)
}
}
}
遍历这些组更复杂,但基本思想是保留一张你见过的元素的地图,如果你击中了其中一个元素,就不会继续遍历,还要跟踪你是否已经合并元素时,通过在 W 包装器中设置标志来遍历集合。
这是一个想法:Gelly 是 Flink 的一部分,它有一个连接组件查找器。制作一个图,其中每个集合元素都有一个节点,边以最简单的方式连接每个集合的元素,例如对于 {a, b, c, d, ...} 添加 [a,b], [a,c], [a,d], [a,... 。现在找到连接的组件。他们的节点给了你要找的集合。
编辑
如果您担心从集合到图形再转换回来的性能影响(尽管这种担心是过早的优化;您应该尝试一下),重新实现 Gelly 的集合令牌推送方案就足够简单了。这是如何工作的。您的示例中已经有了标记:集合编号。让 S[i] 设置为 i 在您的示例中显示。例如。 S[1] = {1,2}。设 R 是一个逆多重图,它将每个集合元素带到它所属的集合集合中。例如。 R[2] = {1,2} 在你的例子中。令 T[i] 为集合 i 可通过传递非空交集 "links" 到达的元素。然后计算:
T[i] = S[i] for all i // with no links at all, a set reaches its own elements
loop
for all i, Tnew[i] = \union_{ x \in T[i] } S[R[x]] // add new reachables
exit if Tnew == T
T = Tnew
end loop
完成后,地图 T 的不同值就是您想要的答案。最大迭代次数应为 log |U|其中 U 是集合元素的范围。
我正在 Flink 上进行 MinHash LSH 的分布式实现,作为最后一步,我需要合并一些集群,这些集群被标识为它们之间相似的元素集。
所以我有一组分布式集合作为输入,我需要一种算法来有效地将集合与公共元素合并。鉴于 Flink 的计算模型,该算法可能是迭代的,不一定像 map-reduce。
举个例子:
从 {{1{1,2}},{2,{2,3}},{3,{4,5},{4{1,27}}}}
结果应该是 {1,2,3,27},{4,5}
因为集合 #1、#2 和 #4 至少有一个共同元素。
只是一个想法,可能会有更好的方法,但是这样怎么样:
- 在映射步骤中,您为每个集合中的每个元素发出键值对,例如:
element -> other elements
- 在减少步骤中,您收集那些其他元素并丢弃重复项
- 重复直到数据结构停止变化
第一次迭代后,您的数据将如下所示:
1 -> 2, 27
2 -> 1,3
3 -> 2
4 -> 5
5 -> 4
27 -> 1
第二个之后:
1 -> 2, 3, 27
2 -> 1, 3, 27
3 -> 1, 2
4 -> 5
5 -> 4
27 -> 1, 2
最后在第三个之后:
1 -> 2, 3, 27
2 -> 1, 3, 27
3 -> 1, 2, 27
4 -> 5
5 -> 4
27 -> 1, 2, 3
我目前无法确定更改何时停止。
要只获得每个结果的一个副本,您可以删除所有键大于任何其他元素的地方。
如果您有 N
个集合,每个集合大约有 M
个元素,如果重复很少见,则天真的方法(测试每个集合中的每个元素)是 O(N^2 * M^2)
。但是,如果您实际上只有 R << N*M
个不同的元素,那还不错:一旦找到某些东西就可以停止测试,这种情况发生在 N*M
比较之后,只有 R
,所以您下降到 "only" O(N*N*R)
。但是,如果这些集合实际上只存在于 L
个组中,则您不必将每个集合都相互测试,因为一旦您击中正确的组,您就会停止。所以它更像是 O(N*L*R) + O(N*M)
(第二项实际上是在找到要添加到的正确组后将元素添加到组中)。
如果您制作从每个元素到它包含的集合列表的映射——您可以在 O(N*M)
时间内完成——然后您可以遍历每个元素的集合树,访问每个元素不同的元素最多大约一次(即 R
个),并且对于每个访问提到它的每个集合的每个元素(结果大约是 N*M/R
)并添加它的所有元素(但只有一次! ),如果您注意不要多次添加同一组,则总共需要 O(N*M)
次。 (你需要一个集合的包装器,这样你就可以知道你是否已经访问过它们。)这样会更快,但如果 L*R
非常小,你可能不在乎。
在 Scala 中,从元素到树的映射核心类似于
case class W(s: Set[Int]) { var visited: Boolean = false }
def tree(ss: Seq[Set[Int]]) = {
var m = new collection.mutable.HashMap[Int, List[W]]
ss.foreach{ s =>
s.foreach{i =>
m(i) = W(s) :: m.getOrElse(i, Nil)
}
}
}
遍历这些组更复杂,但基本思想是保留一张你见过的元素的地图,如果你击中了其中一个元素,就不会继续遍历,还要跟踪你是否已经合并元素时,通过在 W 包装器中设置标志来遍历集合。
这是一个想法:Gelly 是 Flink 的一部分,它有一个连接组件查找器。制作一个图,其中每个集合元素都有一个节点,边以最简单的方式连接每个集合的元素,例如对于 {a, b, c, d, ...} 添加 [a,b], [a,c], [a,d], [a,... 。现在找到连接的组件。他们的节点给了你要找的集合。
编辑 如果您担心从集合到图形再转换回来的性能影响(尽管这种担心是过早的优化;您应该尝试一下),重新实现 Gelly 的集合令牌推送方案就足够简单了。这是如何工作的。您的示例中已经有了标记:集合编号。让 S[i] 设置为 i 在您的示例中显示。例如。 S[1] = {1,2}。设 R 是一个逆多重图,它将每个集合元素带到它所属的集合集合中。例如。 R[2] = {1,2} 在你的例子中。令 T[i] 为集合 i 可通过传递非空交集 "links" 到达的元素。然后计算:
T[i] = S[i] for all i // with no links at all, a set reaches its own elements
loop
for all i, Tnew[i] = \union_{ x \in T[i] } S[R[x]] // add new reachables
exit if Tnew == T
T = Tnew
end loop
完成后,地图 T 的不同值就是您想要的答案。最大迭代次数应为 log |U|其中 U 是集合元素的范围。