在 Python 中将无向循环图 (UCG) 转换为有向无环图 (DAG) 的最快方法?
Fastest way to convert an undirected cyclic graph (UCG) to a directed acyclic graph (DAG) in Python?
假设我有一个无向循环图 (UCG)。所有边的权值都是1。因此,这个UCG可以用邻接矩阵A
:
来表示
import numpy as np
A = np.array([[0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1],
[1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0],
[1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1],
[1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1],
[1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0],
[0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0],
[1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0],
[0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0],
[0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
[1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0]])
为了可视化 UCG,我们可以通过
将其简单地转换为 networkx.Graph
对象
import networkx as nx
ucg = nx.Graph()
rows, cols = np.where(A == 1)
edges = zip(rows.tolist(), cols.tolist())
ucg.add_edges_from(edges)
UCG 看起来像这样:
我用不同的颜色给节点上色以显示“最小距离”。橙色节点 {8, 9, 10}
是起始节点,绿色节点 {0, 1, 2, 3}
是与起始节点的最小距离为 1 的节点,蓝色节点 {4, 5, 6, 7}
的最小距离为2. 现在我想将其转换为有向无环图 (DAG),箭头从起始节点指向距离为 1 的节点,再指向距离为 2 的节点,依此类推。具有相同“最小距离”的节点之间的边被丢弃。
预期输出是表示 DAG 的字典:
d = {8: {1, 3},
9: {1, 2},
10: {0, 2, 3},
0: {4, 6, 7},
1: {5, 6, 7},
2: {4, 5, 6},
3: {4, 5, 7}}
同样,为了可视化DAG,我们可以通过
将其转换为networkx.DiGraph
对象
dag = nx.DiGraph()
dag.add_edges_from([(k, v) for k, vs in d.items() in for v in vs])
预期的输出 DAG 如下所示:
我想编写一个高效通用的代码,将具有给定起始节点的给定 UCG 转换为相应的 DAG。
我试过的
显然,需要递归。我的想法是使用 BFS 方法为每个起始节点找到 1 距离节点,然后是它们的 1 距离节点,递归不断进行。所有访问过的节点都存储在一个集合 prev_starts
中,以避免倒退。下面是我的代码
from collections import defaultdict
def ucg2dag(A, starts):
"""Takes the adjacency matrix of a UCG and the indices of the
starting nodes, returns the dictionary of a DAG."""
def recur(starts):
starts = list(set(starts))
idxs, nbrs = np.where(A[starts] == 1)
prev_starts.update(starts)
# Filter out the neighbors that are previous starts so the
# arrows do not point backwards
try:
idxs, nbrs = zip(*((idx, nbr) for idx, nbr in zip(idxs, nbrs)
if nbr not in prev_starts))
# Terminate if every neighbor is a previous start.
except:
return d
for idx, nbr in zip(idxs, nbrs):
d[starts[idx]].add(nbr)
return recur(starts=nbrs)
prev_starts = set()
d = defaultdict(set)
return recur(starts)
正在测试我的代码:
d = ucg2dag(A, starts={8, 9, 10})
print(d)
编辑:由于@trincot 的评论,在 recur
之前添加 return
之后,我能够得到正确的输出:
defaultdict(<class 'set'>,
{8: {1, 3},
9: {1, 2},
10: {0, 2, 3},
0: {4, 6, 7},
1: {5, 6, 7},
2: {4, 5, 6},
3: {4, 5, 7}})
%timeit 37.6 µs ± 591 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
实际上我有一个更大的图表。我想知道有没有更高效的算法?
您已对您的代码应用了一些修复(部分基于注释),因此您现在有了工作代码。
仅剩的几条评论是:
BFS 通常不是递归算法(与 DFS 相比):您拥有的递归是尾递归的一种情况。在那种情况下,它可以写成一个循环,你将避免使用堆栈。
很遗憾你必须在邻接矩阵中查找边。最好先把邻接矩阵转成邻接表,除非图真的很稠密
输出可能也是一个邻接表,每个节点都有一个条目,这样就可以了可以是列表的列表而不是字典
使用zip
重复转换结构可能不是最有效的(虽然我没有进行基准测试)
如果不使用 numpy,它可能看起来像这样:
def ucg2dag(adj_matrix, starts):
adj_list = [
[target for target, is_connected in enumerate(row) if is_connected]
for row in adj_matrix
]
frontier = starts
dag = [[] for _ in range(len(adj_list))]
while frontier:
for source in frontier:
dag[source].extend(target for target in adj_list[source] if not target in starts)
frontier = set(target
for source in frontier for target in adj_list[source] if not target in starts
)
starts.update(frontier)
return dag
示例运行:
adj_matrix = [[0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1],
[1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0],
[1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1],
[1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1],
[1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0],
[0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0],
[1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0],
[0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0],
[0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
[1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0]]
dag = ucg2dag(adj_matrix, {8, 9, 10})
print(dag)
示例运行的输出:
[[4, 6, 7], [5, 6, 7], [4, 5, 6], [4, 5, 7], [], [], [], [], [1, 3], [1, 2], [0, 2, 3]]
假设我有一个无向循环图 (UCG)。所有边的权值都是1。因此,这个UCG可以用邻接矩阵A
:
import numpy as np
A = np.array([[0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1],
[1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0],
[1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1],
[1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1],
[1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0],
[0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0],
[1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0],
[0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0],
[0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
[1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0]])
为了可视化 UCG,我们可以通过
将其简单地转换为networkx.Graph
对象
import networkx as nx
ucg = nx.Graph()
rows, cols = np.where(A == 1)
edges = zip(rows.tolist(), cols.tolist())
ucg.add_edges_from(edges)
UCG 看起来像这样:
我用不同的颜色给节点上色以显示“最小距离”。橙色节点 {8, 9, 10}
是起始节点,绿色节点 {0, 1, 2, 3}
是与起始节点的最小距离为 1 的节点,蓝色节点 {4, 5, 6, 7}
的最小距离为2. 现在我想将其转换为有向无环图 (DAG),箭头从起始节点指向距离为 1 的节点,再指向距离为 2 的节点,依此类推。具有相同“最小距离”的节点之间的边被丢弃。
预期输出是表示 DAG 的字典:
d = {8: {1, 3},
9: {1, 2},
10: {0, 2, 3},
0: {4, 6, 7},
1: {5, 6, 7},
2: {4, 5, 6},
3: {4, 5, 7}}
同样,为了可视化DAG,我们可以通过
将其转换为networkx.DiGraph
对象
dag = nx.DiGraph()
dag.add_edges_from([(k, v) for k, vs in d.items() in for v in vs])
预期的输出 DAG 如下所示:
我想编写一个高效通用的代码,将具有给定起始节点的给定 UCG 转换为相应的 DAG。
我试过的
显然,需要递归。我的想法是使用 BFS 方法为每个起始节点找到 1 距离节点,然后是它们的 1 距离节点,递归不断进行。所有访问过的节点都存储在一个集合 prev_starts
中,以避免倒退。下面是我的代码
from collections import defaultdict
def ucg2dag(A, starts):
"""Takes the adjacency matrix of a UCG and the indices of the
starting nodes, returns the dictionary of a DAG."""
def recur(starts):
starts = list(set(starts))
idxs, nbrs = np.where(A[starts] == 1)
prev_starts.update(starts)
# Filter out the neighbors that are previous starts so the
# arrows do not point backwards
try:
idxs, nbrs = zip(*((idx, nbr) for idx, nbr in zip(idxs, nbrs)
if nbr not in prev_starts))
# Terminate if every neighbor is a previous start.
except:
return d
for idx, nbr in zip(idxs, nbrs):
d[starts[idx]].add(nbr)
return recur(starts=nbrs)
prev_starts = set()
d = defaultdict(set)
return recur(starts)
正在测试我的代码:
d = ucg2dag(A, starts={8, 9, 10})
print(d)
编辑:由于@trincot 的评论,在 recur
之前添加 return
之后,我能够得到正确的输出:
defaultdict(<class 'set'>,
{8: {1, 3},
9: {1, 2},
10: {0, 2, 3},
0: {4, 6, 7},
1: {5, 6, 7},
2: {4, 5, 6},
3: {4, 5, 7}})
%timeit 37.6 µs ± 591 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
实际上我有一个更大的图表。我想知道有没有更高效的算法?
您已对您的代码应用了一些修复(部分基于注释),因此您现在有了工作代码。
仅剩的几条评论是:
BFS 通常不是递归算法(与 DFS 相比):您拥有的递归是尾递归的一种情况。在那种情况下,它可以写成一个循环,你将避免使用堆栈。
很遗憾你必须在邻接矩阵中查找边。最好先把邻接矩阵转成邻接表,除非图真的很稠密
输出可能也是一个邻接表,每个节点都有一个条目,这样就可以了可以是列表的列表而不是字典
使用
zip
重复转换结构可能不是最有效的(虽然我没有进行基准测试)
如果不使用 numpy,它可能看起来像这样:
def ucg2dag(adj_matrix, starts):
adj_list = [
[target for target, is_connected in enumerate(row) if is_connected]
for row in adj_matrix
]
frontier = starts
dag = [[] for _ in range(len(adj_list))]
while frontier:
for source in frontier:
dag[source].extend(target for target in adj_list[source] if not target in starts)
frontier = set(target
for source in frontier for target in adj_list[source] if not target in starts
)
starts.update(frontier)
return dag
示例运行:
adj_matrix = [[0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1],
[1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0],
[1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1],
[1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1],
[1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0],
[0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0],
[1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0],
[0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0],
[0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
[1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0]]
dag = ucg2dag(adj_matrix, {8, 9, 10})
print(dag)
示例运行的输出:
[[4, 6, 7], [5, 6, 7], [4, 5, 6], [4, 5, 7], [], [], [], [], [1, 3], [1, 2], [0, 2, 3]]