DFS 查找所有可能的路径非常慢
DFS to find all possible path is very slow
我编写了类似 DFS 的算法来查找从零级别开始的所有可能路径。
有 2,000 个节点和 5,000 个边,下面的代码执行速度非常慢。
对这个算法有什么建议吗?
all_path = []
def printAllPathsUntil(s, path):
path.append(s)
if s not in adj or len(adj[s]) <= 0:
all_path.append(path[:]) # EDIT2
else:
for i in adj[s]:
printAllPathsUntil(i, path)
path.pop()
for point in points_in_start:
path = []
printAllPathsUntil(point, path)
并且 adj
保持边缘;起始位置作为键,目标列表作为值。
points_in_start = [0, 3, 7]
adj = {0: [1, 8],
1: [2, 5],
2: [],
3: [2, 4],
4: [],
5: [6],
6: [],
7: [6],
8: [2]
}
编辑1
- 这是一个DAG。没有循环。
你的算法的问题是它会做很多重复的工作。在您的示例中情况并非如此,因为只有当一个节点被两个其他节点到达时,它是一个叶节点,如 C
,但成像从 D
到 [=13] 的边缘=]:这意味着再次访问了从B
开始的整个子图!对于具有 2000 个节点的图,这将导致速度显着下降。
为了解决这个问题,您可以使用记忆,但这意味着您必须重组算法,而不是添加到现有的 path
,然后将 path
添加到 all_paths
,它必须 return
从当前节点开始的(部分)路径,并将这些路径与父节点组合成完整路径。然后,当您再次从另一个节点访问 B
时,您可以使用 functools.lru_cache
重新使用所有这些部分结果。
from functools import lru_cache
@lru_cache(None)
def getAllPathsUntil(s):
if s not in adj or not adj[s]:
return [ [s] ]
else:
return [ [s, *p] for a in adj[s]
for p in getAllPathsUntil(a)]
all_paths = []
for point in points_in_start:
all_paths.extend(getAllPathsUntil(point))
正如评论和其他答案中已经指出的那样,记住先前访问过的节点的下游路径是一个优化领域。
这是我尝试实现的。
这里,downstream_paths
是一个字典,我们在其中记住,对于每个访问过的非叶节点,该节点的下游路径。
我在最后提到了 %%timeit
个小测试用例的结果,其中包含一个“重访非叶子”的小用例。由于我的测试用例只有一个非叶节点被重新访问的情况,因此改进很小。也许在你的大规模数据集中,性能上会有更大的差距。
输入数据:
points_in_start = [0, 3, 7]
adj = {0: [1, 8],
1: [2, 5],
2: [],
3: [2, 4],
4: [],
5: [6],
6: [],
7: [6],
8: [2], # Non-leaf node "2" is a child of both "8" and "3"
2:[10],
10:[11,18],
11:[12,15],
12:[],
15:[16],
16:[],
18:[12]
}
修改后的代码:
%%timeit
downstream_paths = {} # Maps each node to its
# list of downstream paths
# starting with that node.
def getPathsToLeafsFrom(s): # Returns list of downstream paths starting from s
# and ending in some leaf node.
children = adj.get(s, [])
if not children: # s is a Leaf
paths_from_s = [[s]]
else: # s is a Non-leaf
ds_paths = downstream_paths.get(s, []) # Check if s was previously visited
if ds_paths: # If s was previously visited.
paths_from_s = ds_paths
else: # s was not visited earlier.
paths_from_s = [] # Initialize
for child in children:
paths_from_child = getPathsToLeafsFrom(child) # Recurse for each child
for p in paths_from_child:
paths_from_s.append([s] + p)
downstream_paths[s] = paths_from_s # Cache this, to use when s is re-visited
return paths_from_s
path = []
for point in points_in_start:
path.extend(getPathsToLeafsFrom(point))
输出:
from pprint import pprint
pprint (all_path)
[[0, 1, 2, 10, 11, 12],
[0, 1, 2, 10, 11, 15, 16],
[0, 1, 2, 10, 18, 12],
[0, 1, 5, 6],
[0, 8, 2, 10, 11, 12],
[0, 8, 2, 10, 11, 15, 16],
[0, 8, 2, 10, 18, 12],
[3, 2, 10, 11, 12],
[3, 2, 10, 11, 15, 16],
[3, 2, 10, 18, 12],
[3, 4],
[7, 6]]
计时结果:原贴代码:
10000 loops, best of 3: 63 µs per loop
计时结果:优化代码:
10000 loops, best of 3: 43.2 µs per loop
我编写了类似 DFS 的算法来查找从零级别开始的所有可能路径。 有 2,000 个节点和 5,000 个边,下面的代码执行速度非常慢。 对这个算法有什么建议吗?
all_path = []
def printAllPathsUntil(s, path):
path.append(s)
if s not in adj or len(adj[s]) <= 0:
all_path.append(path[:]) # EDIT2
else:
for i in adj[s]:
printAllPathsUntil(i, path)
path.pop()
for point in points_in_start:
path = []
printAllPathsUntil(point, path)
并且 adj
保持边缘;起始位置作为键,目标列表作为值。
points_in_start = [0, 3, 7]
adj = {0: [1, 8],
1: [2, 5],
2: [],
3: [2, 4],
4: [],
5: [6],
6: [],
7: [6],
8: [2]
}
编辑1
- 这是一个DAG。没有循环。
你的算法的问题是它会做很多重复的工作。在您的示例中情况并非如此,因为只有当一个节点被两个其他节点到达时,它是一个叶节点,如 C
,但成像从 D
到 [=13] 的边缘=]:这意味着再次访问了从B
开始的整个子图!对于具有 2000 个节点的图,这将导致速度显着下降。
为了解决这个问题,您可以使用记忆,但这意味着您必须重组算法,而不是添加到现有的 path
,然后将 path
添加到 all_paths
,它必须 return
从当前节点开始的(部分)路径,并将这些路径与父节点组合成完整路径。然后,当您再次从另一个节点访问 B
时,您可以使用 functools.lru_cache
重新使用所有这些部分结果。
from functools import lru_cache
@lru_cache(None)
def getAllPathsUntil(s):
if s not in adj or not adj[s]:
return [ [s] ]
else:
return [ [s, *p] for a in adj[s]
for p in getAllPathsUntil(a)]
all_paths = []
for point in points_in_start:
all_paths.extend(getAllPathsUntil(point))
正如评论和其他答案中已经指出的那样,记住先前访问过的节点的下游路径是一个优化领域。
这是我尝试实现的。
这里,downstream_paths
是一个字典,我们在其中记住,对于每个访问过的非叶节点,该节点的下游路径。
我在最后提到了 %%timeit
个小测试用例的结果,其中包含一个“重访非叶子”的小用例。由于我的测试用例只有一个非叶节点被重新访问的情况,因此改进很小。也许在你的大规模数据集中,性能上会有更大的差距。
输入数据:
points_in_start = [0, 3, 7]
adj = {0: [1, 8],
1: [2, 5],
2: [],
3: [2, 4],
4: [],
5: [6],
6: [],
7: [6],
8: [2], # Non-leaf node "2" is a child of both "8" and "3"
2:[10],
10:[11,18],
11:[12,15],
12:[],
15:[16],
16:[],
18:[12]
}
修改后的代码:
%%timeit
downstream_paths = {} # Maps each node to its
# list of downstream paths
# starting with that node.
def getPathsToLeafsFrom(s): # Returns list of downstream paths starting from s
# and ending in some leaf node.
children = adj.get(s, [])
if not children: # s is a Leaf
paths_from_s = [[s]]
else: # s is a Non-leaf
ds_paths = downstream_paths.get(s, []) # Check if s was previously visited
if ds_paths: # If s was previously visited.
paths_from_s = ds_paths
else: # s was not visited earlier.
paths_from_s = [] # Initialize
for child in children:
paths_from_child = getPathsToLeafsFrom(child) # Recurse for each child
for p in paths_from_child:
paths_from_s.append([s] + p)
downstream_paths[s] = paths_from_s # Cache this, to use when s is re-visited
return paths_from_s
path = []
for point in points_in_start:
path.extend(getPathsToLeafsFrom(point))
输出:
from pprint import pprint
pprint (all_path)
[[0, 1, 2, 10, 11, 12],
[0, 1, 2, 10, 11, 15, 16],
[0, 1, 2, 10, 18, 12],
[0, 1, 5, 6],
[0, 8, 2, 10, 11, 12],
[0, 8, 2, 10, 11, 15, 16],
[0, 8, 2, 10, 18, 12],
[3, 2, 10, 11, 12],
[3, 2, 10, 11, 15, 16],
[3, 2, 10, 18, 12],
[3, 4],
[7, 6]]
计时结果:原贴代码:
10000 loops, best of 3: 63 µs per loop
计时结果:优化代码:
10000 loops, best of 3: 43.2 µs per loop