K-最大元素算法比较

K-Largest elements Algorithm Comparison

我正在尝试从大型数组中提取 k 个最大的元素并比较以下 2 种算法

  1. 最小堆方法。 T(n) = k + nlogk
  2. 排序方法。 T(n) = nlogn

但由于某种原因,算法 2 的性能更好。谁能解释一下为什么

import math


class MinHeap:
    def __init__(self, arr):
        self.__items = arr
        self.build_heap()

    def size(self):
        return len(self.__items)
def items(self):
    return self.__items

def build_heap(self):
    for node_number in xrange(int(len(self.__items) / 2), 0, -1):
        self.heapify(node_number)

def heapify(self, node_number):
    # return if leave node
    if node_number > int(len(self.__items) / 2):
        return

    node = self.__items[node_number-1]
    left_child = self.__items[(2 * node_number)-1] if (((2 * node_number)-1) < len(self.__items)) else None 

    right_child = self.__items[(2 * node_number + 1)-1] if (((2 * node_number + 1)-1) < len(self.__items)) else None        

    min_node = node

    if left_child != None and right_child != None:
        min_node = min(node, left_child, right_child)
    elif left_child != None :
        min_node = min(node, left_child)
    elif right_child != None :
        min_node = min(node, right_child)

    if min_node == node:
        return
    elif left_child!=None and min_node == left_child:

        self.__items[node_number - 1], self.__items[(2 * node_number)-1] = self.__items[(2 * node_number)-1], self.__items[node_number - 1]
        self.heapify(2 * node_number)
    elif right_child!=None and min_node == right_child:

        self.__items[node_number - 1], self.__items[(2 * node_number + 1)-1] = self.__items[(2 * node_number + 1)-1], self.__items[node_number - 1]
        self.heapify(2 * node_number + 1)

def extract_min(self):
    length = len(self.__items)
    if length == 0:
        return
    self.__items[0], self.__items[length-1] = self.__items[length-1], self.__items[0]
    min_element =  self.__items.pop()
    self.heapify(1);
    return min_element

def insert(self, num):
    self.__items.append(num)
    current_node = len(self.__items)
    parent_node = int(current_node / 2)
    while current_node > 1:
        min_node = min(self.__items[current_node-1], self.__items[parent_node-1])
        if min_node == self.__items[parent_node-1]:
            break
        self.__items[current_node-1], self.__items[parent_node-1] = self.__items[parent_node-1], self.__items[current_node-1]
        current_node = parent_node
        parent_node = int(current_node / 2)

# Comparing Algorithms ::::::::::::::::::
import time
import random
from min_heap import *

numbers = random.sample(range(100000), 50000)

k = 3
n = len(numbers)

# k Largest element using Heap T(n) = k + nlogk
start = time.time()
my_heap = MinHeap(numbers[:k+1])

for number in numbers[k+1:]:
    my_heap.extract_min()
    my_heap.insert(number)

data =  sorted(my_heap.items(),reverse=True)
print data[:len(data)-1]
end = time.time()
print "Took {} seconds".format(end-start)


# k Largest element using sorting T(n) = nlogn
start = time.time()
sorted_arr = sorted(numbers, reverse=True)
print sorted_arr[:k]
end = time.time()
print "Took {} seconds".format(end-start)

这是我得到的输出:

算法 1

[99999, 99998, 99997]
Took 0.15064406395 seconds

算法 2

[99999, 99998, 99997]
Took 0.0120780467987 seconds

答案分为两部分:

  1. 实用方面:

    调用用户定义的函数通常很慢,不断增长的列表需要重新分配内存等。None 计算在“sort() + [:k]”方法中。 Python 如果作者不注意内存管理的细节,代码可能会很慢。因此,您正在衡量实施差异和算法差异。

  2. 理论方面:

    我不知道你是怎么想出 T(n) = k + nlogk 的最小堆方法的,但我看到你调用了 my_heap.insert()my_heap.extract_min()(触发 heapify())几乎 n 次。两者都是n的对数,所以看起来更像nlog(n)


这是分析器的输出 运行:

$ python -m cProfile -s cumtime h.py | head -n 20
[99999, 99998, 99997]
Took 0.255105018616 seconds
[99999, 99998, 99997]
Took 0.0103080272675 seconds
         800007 function calls (750010 primitive calls) in 0.291 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.022    0.022    0.291    0.291 h.py:1(<module>)
    49996    0.034    0.000    0.144    0.000 h.py:47(extract_min)
99995/49998  0.088    0.000    0.103    0.000 h.py:17(heapify)
    49996    0.076    0.000    0.090    0.000 h.py:56(insert)
        1    0.018    0.018    0.021    0.021 random.py:293(sample)
   149975    0.015    0.000    0.015    0.000 {min}
   299987    0.014    0.000    0.014    0.000 {len}
        2    0.010    0.005    0.010    0.005 {sorted}
    49996    0.004    0.000    0.004    0.000 {method 'pop' of 'list' objects}
    49996    0.003    0.000    0.003    0.000 {method 'append' of 'list' objects}
    50000    0.003    0.000    0.003    0.000 {method 'random' of '_random.Random' objects}

理论界限是:

  • 排序:O(n log n)
  • 堆select离子:O(n log k)
  • quick select: O(n)

您的堆 selection 方法的一个问题是您正在创建整个项目集的堆。您实际上只需要创建前 k 项的堆。算法是:

h = create a min heap of first k items // O(k)
for each remaining item
    if item is larger than smallest item on heap
        remove smallest item from heap
        add new item to heap

该算法的最坏情况是 O(n log k)。这个想法是,在最坏的情况下,您最终会从大小为 k 的堆中进行 n 次插入和 n 次删除。在平均情况下,插入和删除的次数要少得多。这取决于 k 与 n 的关系。如果k远小于n,这个方法可以非常快。

在您的代码中,您正在创建前 k 个项目的最小堆,然后您有这个循环:

for number in numbers[k+1:]:
    my_heap.extract_min()
    my_heap.insert(number)

该代码不会可靠地产生正确的结果,因为您无条件地删除了最小元素。因此,如果堆已经包含前 k 个元素,但您仍然有数字,它将用较小的元素替换较大的元素。您需要修改您的代码:

for number in numbers[k+1:]:
    if (number < my_heap.get_min())  // get_min returns the minimum element, but doesn't remove it
        my_heap.extract_min()
        my_heap.insert(number)

这不仅会给你正确的答案,而且会减少你在一般情况下必须进行的插入和删除的次数,从而加快你的程序。

您的堆实现也不是最佳的。使 heapify 函数迭代而不是递归,您将节省大量时间,并且您有足够的机会优化该函数。

正如我上面提到的,quick select 是一个 O(n) 算法。通常,当 k 大于 n 的大约 2% 时,quick select 更快。因此,如果您要 select 从 1,000 项列表中选择 100 项,快速 select 几乎肯定会更快。如果您要从 1,000,000 的列表中 selecting 100,那么堆 selection 方法应该更快。

排序不应该比快速 select 快,也不应该比堆 selection 快,除非你 select 的百分比非常大(超过 60%,我预计)的项目。

几年前我对此做了一些相当详细的分析。看我的博客 post, When theory meets practice.