python 中 A* 实现的优化

Optimization of A* implementation in python

为了解决project euler的问题83,我尝试使用A*算法。该算法适用于给定的问题,我得到了正确的结果。但是当我将算法可视化时,我意识到算法似乎检查了许多可能的节点。是因为我没有正确实现算法还是我遗漏了其他东西?我尝试使用两种不同的启发式函数,您可以在下面的代码中看到它们,但输出没有太大变化。

如果有任何提高代码效率的技巧,我也将不胜感激,因为我是一个初学者。

import heapq
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from matplotlib import animation
import numpy as np

class PriorityQueue:
    def __init__(self):
        self.elements = []

    def empty(self):
        return not self.elements

    def put(self, item, priority):
        heapq.heappush(self.elements, (priority, item))

    def get(self):
        return heapq.heappop(self.elements)[1]

class A_star:
    def __init__(self, data, start, end):
        self.data = data
        self.start = start
        self.end = end
        self.a = len(self.data)
        self.b = len(self.data[0])

    def h_matrix(self):
        elements = sorted([self.data[i][j] for j in range(self.b) for i in range(self.a)])
        n = self.a + self.b - 1
        minimum = elements[:n]
        h = []
        for i in range(self.a):
            h_i = []
            for j in range(self.b):
                h_i.append(sum(minimum[:(n-i-j-1)]))
            h.append(h_i)
        return h

    def astar(self):
        h = self.h_matrix()
        open_list = PriorityQueue()
        open_list.put(self.start, 0)
        came_from = {}
        cost_so_far = {}
        came_from[self.start] = None
        cost_so_far[self.start] = self.data[0][0]
        checked = []

        while not open_list.empty():
            current = open_list.get()
            checked.append(current)

            if current == self.end:
                break

            neighbors = [(current[0]+x, current[1]+y) for x, y in {(-1,0), (0,-1), (1,0), (0,1)}
                if 0 <= current[0]+x < self.a and 0 <= current[1]+y < self.b]
            for next in neighbors:
                new_cost = cost_so_far[current] + self.data[next[0]][next[1]]
                if next not in cost_so_far or new_cost < cost_so_far[next]:
                    cost_so_far[next] = new_cost
                    priority = new_cost + h[next[0]][next[1]]
                    open_list.put(next, priority)
                    came_from[next] = current

        return came_from, checked, cost_so_far[self.end]

    def reconstruct_path(self):
        paths = self.astar()[0]
        best_path = [self.end]
        while best_path[0] is not None:
            new = paths[best_path[0]]
            best_path.insert(0, new)
        return best_path[1:]

    def minimum(self):
        return self.astar()[2]


if __name__ == "__main__":
    liste = [[131, 673, 234, 103, 18], [201, 96, 342, 965, 150], [630, 803, 746, 422, 111], [537, 699, 497, 121, 956], [805, 732, 524, 37, 331]]
    path = A_star(liste, (0,0), (4,4))
    print(path.astar())
    #print(path.reconstruct_path())
    path.plot_path(speed=200)

在这里你可以看到我对问题中给出的 80x80 矩阵的可视化。蓝色是所有选中的点,红色是最佳路径。根据我的理解,矩阵中的每个点都不应该被选中,即蓝色。 https://i.stack.imgur.com/LKkdh.png

我最初的猜测是我的启发式函数不够好。如果我选择 h=0,这意味着 Dijkstra 算法我的检查列表的长度是 6400。相反,如果我使用我的自定义 h,长度是 6455。但是我如何改进任意矩阵的启发式函数? 感谢您的帮助。

让我从你的结尾开始 post:

将单元格标记为选中

if I use my custom h the length is 6455.

您的大小 checked 不应超过单元格数。因此,让我首先建议对此进行改进:不使用列表,而是使用集合,并跳过从优先级队列中弹出的任何已经存在于集合中的内容。相关代码将如下所示:

        checked = set()

        while not open_list.empty():
            current = open_list.get()
            if current in checked:  # don't repeat the same search
                continue
            checked.add(current)

如果最后你需要 checked 的列表版本,就 return 那样:

        return came_from, list(checked), cost_so_far[self.end]

现在进入主要问题:

改进启发式功能

From my understanding it shouldn't be the case that every point in the matrix is in checked i.e. blue. My initial guess would be that my heuristic function is not good enough.

这是正确的解释。结合给定矩阵具有总成本非常接近的路径这一事实,因此存在涉及矩阵大部分的竞争领域。

how can I improve the heuristic function for an arbitrary matrix?

一个想法是考虑以下内容。路径必须至少包含来自 each“前向”对角线 (/) 的一个元素。因此,如果我们使用每个这样的对角线上的最小值并创建 运行 总和(向后——从目标开始),我们将得到 h.

的可行值

这是该想法的代码:

    def h_matrix(self):
        min_diagonals = [float("inf")] * (self.a + self.b - 1)
        # For each diagonal get the minimum cost on that diagonal
        for i, row in enumerate(self.data):
            for j, cost in enumerate(row):
                min_diagonals[i+j] = min(min_diagonals[i+j], cost)
        # Create a running sum in backward direction
        for i in range(len(min_diagonals) - 2, -1, -1):
            min_diagonals[i] += min_diagonals[i + 1] 
        min_diagonals.append(0)  # Add an entry for allowing the next logic to work
        # These sums are a lower bound for the remaining cost towards the target 
        h = [
            [min_diagonals[i + j + 1] for j in range(self.b)] 
            for i in range(self.a)
        ]
        return h

通过这些改进,我们得到了这些计数:

len(cost_so_far) == 6374
len(checked) == 6339

这仍然代表矩阵的很大一部分,但至少遗漏了一些单元格。