使用 Python 实现 Kahn 的拓扑排序算法
Implement Kahn's topological sorting algorithm using Python
Kahn在62中提出了一个算法topologically sort任意DAG(有向无环图),伪代码复制自维基百科:
L ← Empty list that will contain the sorted elements
S ← Set of all nodes with no incoming edges
while S is non-empty do
remove a node n from S
add n to tail of L
for each node m with an edge e from n to m do
remove edge e from the graph # This is a DESTRUCTIVE step!
if m has no other incoming edges then
insert m into S if graph has edges then
return error (graph has at least one cycle) else
return L (a topologically sorted order)
我需要使用 IPython3 实现它,DAG 的实现如下:
class Node(object):
def __init__(self, name, parents):
assert isinstance(name, str)
assert all(isinstance(_, RandomVariable) for _ in parents)
self.name, self.parents = name, parents
其中 name
是节点的标签,parents
存储其所有父节点。那么DAGclass实现为:
class DAG(object):
def __init__(self, *nodes):
assert all(isinstance(_, Node) for _ in nodes)
self.nodes = nodes
(DAG 实现是固定的,不会改进。) 然后我需要将 Kahn 的算法实现为一个函数 top_order
,它接受一个 DAG 实例和returns 像 (node_1, node_2, ..., node_n)
这样的排序。主要问题是,该算法具有破坏性,因为它的步骤之一是 remove edge e from the graph
(第 5 行),它将删除 m.parents
的一个成员。但是,我必须完整保留 DAG 实例。
到目前为止我能想到的一种方法是创建一个 deep 的 DAG 实例副本(即使是浅副本也不能完成这项工作,因为算法仍然通过引用销毁原始实例),并对这个副本执行破坏算法,然后得到这个副本的节点名称的正确排序(假设节点之间没有命名冲突),然后使用这个名称排序来推断原始实例节点的正确排序,大致如下:
def top_order(network):
'''takes in a DAG, prints and returns a topological ordering.'''
assert type(network) == DAG
temp = copy.deepcopy(network) # to leave the original instance intact
ordering_name = []
roots = [node for node in temp.nodes if not node.parents]
while roots:
n_node = roots[0]
del roots[0]
ordering_name.append(n_node.name)
for m_node in temp.nodes:
if n_node in m_node.parents:
temp_list = list(m_node.parents)
temp_list.remove(n_node)
m_node.parents = tuple(temp_list)
if not m_node.parents:
roots.append(m_node)
print(ordering_name) # print ordering by name
# gets ordering of nodes of the original instance
ordering = []
for name in ordering_name:
for node in network.nodes:
if node.name == name:
ordering.append(node)
return tuple(ordering)
两个问题:第一,当network
很大时,深拷贝会很耗资源;其次,我想改进我的嵌套 for
循环,它获取原始实例的顺序。 (第二次我想到像 sorted
方法等的东西。)
有什么建议吗?
我将建议算法的一个不那么直白的实现:你根本不需要操纵 DAG,你只需要操纵 info about DAG .该算法唯一需要 "interesting" 的东西是从一个节点到它的子节点的映射(与您的 DAG 实际存储的相反),以及每个节点的父节点的数量。
这些很容易计算,并且可以使用字典将此信息与节点名称相关联(假设所有名称都是不同的 - 如果不是,您可以使用更多代码来发明唯一名称)。
那么这应该可行:
def topsort(dag):
name2node = {node.name: node for node in dag.nodes}
# map name to number of predecessors (parents)
name2npreds = {}
# map name to list of successors (children)
name2succs = {name: [] for name in name2node}
for node in dag.nodes:
thisname = node.name
name2npreds[thisname] = len(node.parents)
for p in node.parents:
name2succs[p.name].append(thisname)
result = [n for n, npreds in name2npreds.items() if npreds == 0]
for p in result:
for c in name2succs[p]:
npreds = name2npreds[c]
assert npreds
npreds -= 1
name2npreds[c] = npreds
if npreds == 0:
result.append(c)
if len(result) < len(name2node):
raise ValueError("no topsort - cycle")
return tuple(name2node[p] for p in result)
这里有一个微妙之处:外层循环附加到 result
而 它迭代 result
。那是故意的。效果是 result
中的每个元素都被外循环恰好处理一次,无论元素是在初始 result
中还是后来添加的。
请注意,在遍历输入 DAG
和 Node
时,不会更改其中的任何内容。
Kahn在62中提出了一个算法topologically sort任意DAG(有向无环图),伪代码复制自维基百科:
L ← Empty list that will contain the sorted elements
S ← Set of all nodes with no incoming edges
while S is non-empty do
remove a node n from S
add n to tail of L
for each node m with an edge e from n to m do
remove edge e from the graph # This is a DESTRUCTIVE step!
if m has no other incoming edges then
insert m into S if graph has edges then
return error (graph has at least one cycle) else
return L (a topologically sorted order)
我需要使用 IPython3 实现它,DAG 的实现如下:
class Node(object):
def __init__(self, name, parents):
assert isinstance(name, str)
assert all(isinstance(_, RandomVariable) for _ in parents)
self.name, self.parents = name, parents
其中 name
是节点的标签,parents
存储其所有父节点。那么DAGclass实现为:
class DAG(object):
def __init__(self, *nodes):
assert all(isinstance(_, Node) for _ in nodes)
self.nodes = nodes
(DAG 实现是固定的,不会改进。) 然后我需要将 Kahn 的算法实现为一个函数 top_order
,它接受一个 DAG 实例和returns 像 (node_1, node_2, ..., node_n)
这样的排序。主要问题是,该算法具有破坏性,因为它的步骤之一是 remove edge e from the graph
(第 5 行),它将删除 m.parents
的一个成员。但是,我必须完整保留 DAG 实例。
到目前为止我能想到的一种方法是创建一个 deep 的 DAG 实例副本(即使是浅副本也不能完成这项工作,因为算法仍然通过引用销毁原始实例),并对这个副本执行破坏算法,然后得到这个副本的节点名称的正确排序(假设节点之间没有命名冲突),然后使用这个名称排序来推断原始实例节点的正确排序,大致如下:
def top_order(network):
'''takes in a DAG, prints and returns a topological ordering.'''
assert type(network) == DAG
temp = copy.deepcopy(network) # to leave the original instance intact
ordering_name = []
roots = [node for node in temp.nodes if not node.parents]
while roots:
n_node = roots[0]
del roots[0]
ordering_name.append(n_node.name)
for m_node in temp.nodes:
if n_node in m_node.parents:
temp_list = list(m_node.parents)
temp_list.remove(n_node)
m_node.parents = tuple(temp_list)
if not m_node.parents:
roots.append(m_node)
print(ordering_name) # print ordering by name
# gets ordering of nodes of the original instance
ordering = []
for name in ordering_name:
for node in network.nodes:
if node.name == name:
ordering.append(node)
return tuple(ordering)
两个问题:第一,当network
很大时,深拷贝会很耗资源;其次,我想改进我的嵌套 for
循环,它获取原始实例的顺序。 (第二次我想到像 sorted
方法等的东西。)
有什么建议吗?
我将建议算法的一个不那么直白的实现:你根本不需要操纵 DAG,你只需要操纵 info about DAG .该算法唯一需要 "interesting" 的东西是从一个节点到它的子节点的映射(与您的 DAG 实际存储的相反),以及每个节点的父节点的数量。
这些很容易计算,并且可以使用字典将此信息与节点名称相关联(假设所有名称都是不同的 - 如果不是,您可以使用更多代码来发明唯一名称)。
那么这应该可行:
def topsort(dag):
name2node = {node.name: node for node in dag.nodes}
# map name to number of predecessors (parents)
name2npreds = {}
# map name to list of successors (children)
name2succs = {name: [] for name in name2node}
for node in dag.nodes:
thisname = node.name
name2npreds[thisname] = len(node.parents)
for p in node.parents:
name2succs[p.name].append(thisname)
result = [n for n, npreds in name2npreds.items() if npreds == 0]
for p in result:
for c in name2succs[p]:
npreds = name2npreds[c]
assert npreds
npreds -= 1
name2npreds[c] = npreds
if npreds == 0:
result.append(c)
if len(result) < len(name2node):
raise ValueError("no topsort - cycle")
return tuple(name2node[p] for p in result)
这里有一个微妙之处:外层循环附加到 result
而 它迭代 result
。那是故意的。效果是 result
中的每个元素都被外循环恰好处理一次,无论元素是在初始 result
中还是后来添加的。
请注意,在遍历输入 DAG
和 Node
时,不会更改其中的任何内容。