K-最大元素算法比较
K-Largest elements Algorithm Comparison
我正在尝试从大型数组中提取 k 个最大的元素并比较以下 2 种算法
- 最小堆方法。
T(n) = k + nlogk
- 排序方法。
T(n) = nlogn
但由于某种原因,算法 2 的性能更好。谁能解释一下为什么
import math
class MinHeap:
def __init__(self, arr):
self.__items = arr
self.build_heap()
def size(self):
return len(self.__items)
def items(self):
return self.__items
def build_heap(self):
for node_number in xrange(int(len(self.__items) / 2), 0, -1):
self.heapify(node_number)
def heapify(self, node_number):
# return if leave node
if node_number > int(len(self.__items) / 2):
return
node = self.__items[node_number-1]
left_child = self.__items[(2 * node_number)-1] if (((2 * node_number)-1) < len(self.__items)) else None
right_child = self.__items[(2 * node_number + 1)-1] if (((2 * node_number + 1)-1) < len(self.__items)) else None
min_node = node
if left_child != None and right_child != None:
min_node = min(node, left_child, right_child)
elif left_child != None :
min_node = min(node, left_child)
elif right_child != None :
min_node = min(node, right_child)
if min_node == node:
return
elif left_child!=None and min_node == left_child:
self.__items[node_number - 1], self.__items[(2 * node_number)-1] = self.__items[(2 * node_number)-1], self.__items[node_number - 1]
self.heapify(2 * node_number)
elif right_child!=None and min_node == right_child:
self.__items[node_number - 1], self.__items[(2 * node_number + 1)-1] = self.__items[(2 * node_number + 1)-1], self.__items[node_number - 1]
self.heapify(2 * node_number + 1)
def extract_min(self):
length = len(self.__items)
if length == 0:
return
self.__items[0], self.__items[length-1] = self.__items[length-1], self.__items[0]
min_element = self.__items.pop()
self.heapify(1);
return min_element
def insert(self, num):
self.__items.append(num)
current_node = len(self.__items)
parent_node = int(current_node / 2)
while current_node > 1:
min_node = min(self.__items[current_node-1], self.__items[parent_node-1])
if min_node == self.__items[parent_node-1]:
break
self.__items[current_node-1], self.__items[parent_node-1] = self.__items[parent_node-1], self.__items[current_node-1]
current_node = parent_node
parent_node = int(current_node / 2)
# Comparing Algorithms ::::::::::::::::::
import time
import random
from min_heap import *
numbers = random.sample(range(100000), 50000)
k = 3
n = len(numbers)
# k Largest element using Heap T(n) = k + nlogk
start = time.time()
my_heap = MinHeap(numbers[:k+1])
for number in numbers[k+1:]:
my_heap.extract_min()
my_heap.insert(number)
data = sorted(my_heap.items(),reverse=True)
print data[:len(data)-1]
end = time.time()
print "Took {} seconds".format(end-start)
# k Largest element using sorting T(n) = nlogn
start = time.time()
sorted_arr = sorted(numbers, reverse=True)
print sorted_arr[:k]
end = time.time()
print "Took {} seconds".format(end-start)
这是我得到的输出:
算法 1
[99999, 99998, 99997]
Took 0.15064406395 seconds
算法 2
[99999, 99998, 99997]
Took 0.0120780467987 seconds
答案分为两部分:
实用方面:
调用用户定义的函数通常很慢,不断增长的列表需要重新分配内存等。None 计算在“sort()
+ [:k]
”方法中。 Python 如果作者不注意内存管理的细节,代码可能会很慢。因此,您正在衡量实施差异和算法差异。
理论方面:
我不知道你是怎么想出 T(n) = k + nlogk
的最小堆方法的,但我看到你调用了 my_heap.insert()
和 my_heap.extract_min()
(触发 heapify()
)几乎 n
次。两者都是n
的对数,所以看起来更像nlog(n)
。
这是分析器的输出 运行:
$ python -m cProfile -s cumtime h.py | head -n 20
[99999, 99998, 99997]
Took 0.255105018616 seconds
[99999, 99998, 99997]
Took 0.0103080272675 seconds
800007 function calls (750010 primitive calls) in 0.291 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.022 0.022 0.291 0.291 h.py:1(<module>)
49996 0.034 0.000 0.144 0.000 h.py:47(extract_min)
99995/49998 0.088 0.000 0.103 0.000 h.py:17(heapify)
49996 0.076 0.000 0.090 0.000 h.py:56(insert)
1 0.018 0.018 0.021 0.021 random.py:293(sample)
149975 0.015 0.000 0.015 0.000 {min}
299987 0.014 0.000 0.014 0.000 {len}
2 0.010 0.005 0.010 0.005 {sorted}
49996 0.004 0.000 0.004 0.000 {method 'pop' of 'list' objects}
49996 0.003 0.000 0.003 0.000 {method 'append' of 'list' objects}
50000 0.003 0.000 0.003 0.000 {method 'random' of '_random.Random' objects}
理论界限是:
- 排序:O(n log n)
- 堆select离子:O(n log k)
- quick select: O(n)
您的堆 selection 方法的一个问题是您正在创建整个项目集的堆。您实际上只需要创建前 k 项的堆。算法是:
h = create a min heap of first k items // O(k)
for each remaining item
if item is larger than smallest item on heap
remove smallest item from heap
add new item to heap
该算法的最坏情况是 O(n log k)。这个想法是,在最坏的情况下,您最终会从大小为 k 的堆中进行 n 次插入和 n 次删除。在平均情况下,插入和删除的次数要少得多。这取决于 k 与 n 的关系。如果k远小于n,这个方法可以非常快。
在您的代码中,您正在创建前 k 个项目的最小堆,然后您有这个循环:
for number in numbers[k+1:]:
my_heap.extract_min()
my_heap.insert(number)
该代码不会可靠地产生正确的结果,因为您无条件地删除了最小元素。因此,如果堆已经包含前 k 个元素,但您仍然有数字,它将用较小的元素替换较大的元素。您需要修改您的代码:
for number in numbers[k+1:]:
if (number < my_heap.get_min()) // get_min returns the minimum element, but doesn't remove it
my_heap.extract_min()
my_heap.insert(number)
这不仅会给你正确的答案,而且会减少你在一般情况下必须进行的插入和删除的次数,从而加快你的程序。
您的堆实现也不是最佳的。使 heapify
函数迭代而不是递归,您将节省大量时间,并且您有足够的机会优化该函数。
正如我上面提到的,quick select 是一个 O(n) 算法。通常,当 k 大于 n 的大约 2% 时,quick select 更快。因此,如果您要 select 从 1,000 项列表中选择 100 项,快速 select 几乎肯定会更快。如果您要从 1,000,000 的列表中 selecting 100,那么堆 selection 方法应该更快。
排序不应该比快速 select 快,也不应该比堆 selection 快,除非你 select 的百分比非常大(超过 60%,我预计)的项目。
几年前我对此做了一些相当详细的分析。看我的博客 post, When theory meets practice.
我正在尝试从大型数组中提取 k 个最大的元素并比较以下 2 种算法
- 最小堆方法。
T(n) = k + nlogk
- 排序方法。
T(n) = nlogn
但由于某种原因,算法 2 的性能更好。谁能解释一下为什么
import math
class MinHeap:
def __init__(self, arr):
self.__items = arr
self.build_heap()
def size(self):
return len(self.__items)
def items(self):
return self.__items
def build_heap(self):
for node_number in xrange(int(len(self.__items) / 2), 0, -1):
self.heapify(node_number)
def heapify(self, node_number):
# return if leave node
if node_number > int(len(self.__items) / 2):
return
node = self.__items[node_number-1]
left_child = self.__items[(2 * node_number)-1] if (((2 * node_number)-1) < len(self.__items)) else None
right_child = self.__items[(2 * node_number + 1)-1] if (((2 * node_number + 1)-1) < len(self.__items)) else None
min_node = node
if left_child != None and right_child != None:
min_node = min(node, left_child, right_child)
elif left_child != None :
min_node = min(node, left_child)
elif right_child != None :
min_node = min(node, right_child)
if min_node == node:
return
elif left_child!=None and min_node == left_child:
self.__items[node_number - 1], self.__items[(2 * node_number)-1] = self.__items[(2 * node_number)-1], self.__items[node_number - 1]
self.heapify(2 * node_number)
elif right_child!=None and min_node == right_child:
self.__items[node_number - 1], self.__items[(2 * node_number + 1)-1] = self.__items[(2 * node_number + 1)-1], self.__items[node_number - 1]
self.heapify(2 * node_number + 1)
def extract_min(self):
length = len(self.__items)
if length == 0:
return
self.__items[0], self.__items[length-1] = self.__items[length-1], self.__items[0]
min_element = self.__items.pop()
self.heapify(1);
return min_element
def insert(self, num):
self.__items.append(num)
current_node = len(self.__items)
parent_node = int(current_node / 2)
while current_node > 1:
min_node = min(self.__items[current_node-1], self.__items[parent_node-1])
if min_node == self.__items[parent_node-1]:
break
self.__items[current_node-1], self.__items[parent_node-1] = self.__items[parent_node-1], self.__items[current_node-1]
current_node = parent_node
parent_node = int(current_node / 2)
# Comparing Algorithms ::::::::::::::::::
import time
import random
from min_heap import *
numbers = random.sample(range(100000), 50000)
k = 3
n = len(numbers)
# k Largest element using Heap T(n) = k + nlogk
start = time.time()
my_heap = MinHeap(numbers[:k+1])
for number in numbers[k+1:]:
my_heap.extract_min()
my_heap.insert(number)
data = sorted(my_heap.items(),reverse=True)
print data[:len(data)-1]
end = time.time()
print "Took {} seconds".format(end-start)
# k Largest element using sorting T(n) = nlogn
start = time.time()
sorted_arr = sorted(numbers, reverse=True)
print sorted_arr[:k]
end = time.time()
print "Took {} seconds".format(end-start)
这是我得到的输出:
算法 1
[99999, 99998, 99997]
Took 0.15064406395 seconds
算法 2
[99999, 99998, 99997]
Took 0.0120780467987 seconds
答案分为两部分:
实用方面:
调用用户定义的函数通常很慢,不断增长的列表需要重新分配内存等。None 计算在“
sort()
+[:k]
”方法中。 Python 如果作者不注意内存管理的细节,代码可能会很慢。因此,您正在衡量实施差异和算法差异。理论方面:
我不知道你是怎么想出
T(n) = k + nlogk
的最小堆方法的,但我看到你调用了my_heap.insert()
和my_heap.extract_min()
(触发heapify()
)几乎n
次。两者都是n
的对数,所以看起来更像nlog(n)
。
这是分析器的输出 运行:
$ python -m cProfile -s cumtime h.py | head -n 20
[99999, 99998, 99997]
Took 0.255105018616 seconds
[99999, 99998, 99997]
Took 0.0103080272675 seconds
800007 function calls (750010 primitive calls) in 0.291 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.022 0.022 0.291 0.291 h.py:1(<module>)
49996 0.034 0.000 0.144 0.000 h.py:47(extract_min)
99995/49998 0.088 0.000 0.103 0.000 h.py:17(heapify)
49996 0.076 0.000 0.090 0.000 h.py:56(insert)
1 0.018 0.018 0.021 0.021 random.py:293(sample)
149975 0.015 0.000 0.015 0.000 {min}
299987 0.014 0.000 0.014 0.000 {len}
2 0.010 0.005 0.010 0.005 {sorted}
49996 0.004 0.000 0.004 0.000 {method 'pop' of 'list' objects}
49996 0.003 0.000 0.003 0.000 {method 'append' of 'list' objects}
50000 0.003 0.000 0.003 0.000 {method 'random' of '_random.Random' objects}
理论界限是:
- 排序:O(n log n)
- 堆select离子:O(n log k)
- quick select: O(n)
您的堆 selection 方法的一个问题是您正在创建整个项目集的堆。您实际上只需要创建前 k 项的堆。算法是:
h = create a min heap of first k items // O(k)
for each remaining item
if item is larger than smallest item on heap
remove smallest item from heap
add new item to heap
该算法的最坏情况是 O(n log k)。这个想法是,在最坏的情况下,您最终会从大小为 k 的堆中进行 n 次插入和 n 次删除。在平均情况下,插入和删除的次数要少得多。这取决于 k 与 n 的关系。如果k远小于n,这个方法可以非常快。
在您的代码中,您正在创建前 k 个项目的最小堆,然后您有这个循环:
for number in numbers[k+1:]:
my_heap.extract_min()
my_heap.insert(number)
该代码不会可靠地产生正确的结果,因为您无条件地删除了最小元素。因此,如果堆已经包含前 k 个元素,但您仍然有数字,它将用较小的元素替换较大的元素。您需要修改您的代码:
for number in numbers[k+1:]:
if (number < my_heap.get_min()) // get_min returns the minimum element, but doesn't remove it
my_heap.extract_min()
my_heap.insert(number)
这不仅会给你正确的答案,而且会减少你在一般情况下必须进行的插入和删除的次数,从而加快你的程序。
您的堆实现也不是最佳的。使 heapify
函数迭代而不是递归,您将节省大量时间,并且您有足够的机会优化该函数。
正如我上面提到的,quick select 是一个 O(n) 算法。通常,当 k 大于 n 的大约 2% 时,quick select 更快。因此,如果您要 select 从 1,000 项列表中选择 100 项,快速 select 几乎肯定会更快。如果您要从 1,000,000 的列表中 selecting 100,那么堆 selection 方法应该更快。
排序不应该比快速 select 快,也不应该比堆 selection 快,除非你 select 的百分比非常大(超过 60%,我预计)的项目。
几年前我对此做了一些相当详细的分析。看我的博客 post, When theory meets practice.