改进 python numpy 代码的运行时间
Improving runtime of python numpy code
我有一个代码可以将 bins 重新分配给一个大的 numpy
数组。基本上,大阵列的元素已经以不同的频率采样,最终目标是将整个阵列重新装箱在固定箱 freq_bins
。对于我拥有的数组,代码有点慢。有什么好的方法可以提高这段代码的运行时间吗?现在很少有人会这样做。可能有些 numba
魔法会起作用。
import numpy as np
import time
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
time1 = time.time()
for i in xrange(division):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
sky_by_cap = np.einsum('ij, jk->ijk', boost_factor[i],es)
freq_index = np.digitize(fre_boost, freq_bins)
freq_index_reshaped = freq_index.reshape(division*cd, -1)
freq_index = None
sky_by_cap_reshaped = sky_by_cap.reshape(freq_index_reshaped.shape)
to_bin_emit = np.zeros(freq_index_reshaped.shape)
row_index = np.arange(freq_index_reshaped.shape[0]).reshape(-1, 1)
np.add.at(to_bin_emit, (row_index, freq_index_reshaped), sky_by_cap_reshaped)
to_bin_emit = to_bin_emit.reshape(fre_boost.shape)
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
final_emit[i] = np.sum(to_bin_emit, axis=1)
print(time.time()-time1)
我认为将 einsum
替换为实际乘法会稍微提高性能。
import numpy as np
import time
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
time1 = time.time()
for i in xrange(division):
fre_boost = boost_factor[i][:, :, None]*freq_bins[None, None, :]
sky_by_cap = boost_factor[i][:, :, None]*es[None, :, :]
freq_index = np.digitize(fre_boost, freq_bins)
freq_index_reshaped = freq_index.reshape(division*cd, -1)
freq_index = None
sky_by_cap_reshaped = sky_by_cap.reshape(freq_index_reshaped.shape)
to_bin_emit = np.zeros(freq_index_reshaped.shape)
row_index = np.arange(freq_index_reshaped.shape[0]).reshape(-1, 1)
np.add.at(to_bin_emit, (row_index, freq_index_reshaped), sky_by_cap_reshaped)
to_bin_emit = to_bin_emit.reshape(fre_boost.shape)
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
final_emit[i] = np.sum(to_bin_emit, axis=1)
print(time.time()-time1)
您的代码在 np.add.at
时相当慢,我相信在 np.bincount
时会快得多,尽管我无法让它完全适用于您拥有的多维数组。可能这里有人可以补充。
这似乎可以简单地并行化:
- 你有一个外循环 运行 90 次。
- 每次,除了
final_emit
之外,您都不会改变任何共享数组
- …而且,只存储到一个唯一的行中。
- 看起来循环内的大部分工作都是numpy数组范围的操作,这将释放GIL。
因此(使用 futures
backport of concurrent.futures
,因为您似乎使用的是 2.7):
import numpy as np
import time
import futures
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
return np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
for i, row in enumerate(x.map(dostuff, xrange(division))):
final_emit[i] = row
如果可行,可以尝试两个调整,其中任何一个都可能更有效。我们并不真正关心返回结果的顺序,但 map
会将它们按顺序排列。这会浪费一些 space 和时间。我认为这不会有太大的不同(大概,您的大部分时间大概都花在了计算上,而不是写出结果),但是如果不分析您的代码,就很难确定。所以,有两种简单的方法可以解决这个问题。
使用 as_completed
让我们可以按照结果的完成顺序使用结果,而不是按照我们对它们进行排队的顺序。像这样:
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
return i, np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
fs = [x.submit(dostuff, i) for i in xrange(division))
for i, row in futures.as_completed(fs):
final_emit[i] = row
或者,我们可以让函数直接插入行,而不是返回它们。这意味着我们现在正在改变来自多个线程的共享对象。所以我认为我们在这里需要一个锁,虽然我不是肯定的(numpy 的规则有点复杂,我还没有彻底阅读你的代码......)。但这可能不会显着影响性能,而且很容易。所以:
import numpy as np
import threading
# etc.
final_emit = np.zeros((division, division, freq_division))
final_emit_lock = threading.Lock()
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
with final_emit_lock:
final_emit[i] = np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
x.map(dostuff, xrange(division))
我所有示例中的 max_workers=8
都应该针对您的机器进行调整。线程太多是不好的,因为它们开始互相争斗而不是并行化;线程太少会更糟,因为您的一些内核只是闲置在那里。
如果您希望它在各种机器上 运行,而不是针对每台机器进行调整,最好的猜测(对于 2.7)通常是:
import multiprocessing
# ...
with futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as x:
但是如果你想从特定机器中获得最大性能,你应该测试不同的值。特别是,对于具有超线程的典型四核笔记本电脑,理想值可以是 4 到 8 之间的任何值,具体取决于您所做的具体工作,并且尝试所有值比尝试预测更容易。
保持代码简单而不是优化
如果您知道要编写什么算法,请编写一个简单的参考实现。从这里你可以使用 Python 两种方式。您可以尝试将代码矢量化 或 您可以编译代码以获得良好的性能。
即使 np.einsum
或 np.add.at
在 Numba 中实现,任何编译器都很难从您的示例中生成高效的二进制代码。
我唯一重写的是一种更有效的标量值数字化方法。
编辑
在 Numba 源代码中,有一种更有效的标量值数字化实现。
代码
#From Numba source
#Copyright (c) 2012, Anaconda, Inc.
#All rights reserved.
@nb.njit(fastmath=True)
def digitize(x, bins, right=False):
# bins are monotonically-increasing
n = len(bins)
lo = 0
hi = n
if right:
if np.isnan(x):
# Find the first nan (i.e. the last from the end of bins,
# since there shouldn't be many of them in practice)
for i in range(n, 0, -1):
if not np.isnan(bins[i - 1]):
return i
return 0
while hi > lo:
mid = (lo + hi) >> 1
if bins[mid] < x:
# mid is too low => narrow to upper bins
lo = mid + 1
else:
# mid is too high, or is a NaN => narrow to lower bins
hi = mid
else:
if np.isnan(x):
# NaNs end up in the last bin
return n
while hi > lo:
mid = (lo + hi) >> 1
if bins[mid] <= x:
# mid is too low => narrow to upper bins
lo = mid + 1
else:
# mid is too high, or is a NaN => narrow to lower bins
hi = mid
return lo
@nb.njit(fastmath=True)
def digitize(value, bins):
if value<bins[0]:
return 0
if value>=bins[bins.shape[0]-1]:
return bins.shape[0]
for l in range(1,bins.shape[0]):
if value>=bins[l-1] and value<bins[l]:
return l
@nb.njit(fastmath=True,parallel=True)
def inner_loop(boost_factor,freq_bins,es):
res=np.zeros((boost_factor.shape[0],freq_bins.shape[0]),dtype=np.float64)
for i in nb.prange(boost_factor.shape[0]):
for j in range(boost_factor.shape[1]):
for k in range(freq_bins.shape[0]):
ind=nb.int64(digitize(boost_factor[i,j]*freq_bins[k],freq_bins))
res[i,ind]+=boost_factor[i,j]*es[j,k]*freq_bins[ind]
return res
@nb.njit(fastmath=True)
def calc_nb(division,freq_division,cd,boost_factor,freq_bins,es):
final_emit = np.empty((division, division, freq_division),np.float64)
for i in range(division):
final_emit[i,:,:]=inner_loop(boost_factor[i],freq_bins,es)
return final_emit
性能
(Quadcore i7)
original_code: 118.5s
calc_nb: 4.14s
#with digitize implementation from Numba source
calc_nb: 2.66s
我有一个代码可以将 bins 重新分配给一个大的 numpy
数组。基本上,大阵列的元素已经以不同的频率采样,最终目标是将整个阵列重新装箱在固定箱 freq_bins
。对于我拥有的数组,代码有点慢。有什么好的方法可以提高这段代码的运行时间吗?现在很少有人会这样做。可能有些 numba
魔法会起作用。
import numpy as np
import time
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
time1 = time.time()
for i in xrange(division):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
sky_by_cap = np.einsum('ij, jk->ijk', boost_factor[i],es)
freq_index = np.digitize(fre_boost, freq_bins)
freq_index_reshaped = freq_index.reshape(division*cd, -1)
freq_index = None
sky_by_cap_reshaped = sky_by_cap.reshape(freq_index_reshaped.shape)
to_bin_emit = np.zeros(freq_index_reshaped.shape)
row_index = np.arange(freq_index_reshaped.shape[0]).reshape(-1, 1)
np.add.at(to_bin_emit, (row_index, freq_index_reshaped), sky_by_cap_reshaped)
to_bin_emit = to_bin_emit.reshape(fre_boost.shape)
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
final_emit[i] = np.sum(to_bin_emit, axis=1)
print(time.time()-time1)
我认为将 einsum
替换为实际乘法会稍微提高性能。
import numpy as np
import time
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
time1 = time.time()
for i in xrange(division):
fre_boost = boost_factor[i][:, :, None]*freq_bins[None, None, :]
sky_by_cap = boost_factor[i][:, :, None]*es[None, :, :]
freq_index = np.digitize(fre_boost, freq_bins)
freq_index_reshaped = freq_index.reshape(division*cd, -1)
freq_index = None
sky_by_cap_reshaped = sky_by_cap.reshape(freq_index_reshaped.shape)
to_bin_emit = np.zeros(freq_index_reshaped.shape)
row_index = np.arange(freq_index_reshaped.shape[0]).reshape(-1, 1)
np.add.at(to_bin_emit, (row_index, freq_index_reshaped), sky_by_cap_reshaped)
to_bin_emit = to_bin_emit.reshape(fre_boost.shape)
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
final_emit[i] = np.sum(to_bin_emit, axis=1)
print(time.time()-time1)
您的代码在 np.add.at
时相当慢,我相信在 np.bincount
时会快得多,尽管我无法让它完全适用于您拥有的多维数组。可能这里有人可以补充。
这似乎可以简单地并行化:
- 你有一个外循环 运行 90 次。
- 每次,除了
final_emit
之外,您都不会改变任何共享数组- …而且,只存储到一个唯一的行中。
- 看起来循环内的大部分工作都是numpy数组范围的操作,这将释放GIL。
因此(使用 futures
backport of concurrent.futures
,因为您似乎使用的是 2.7):
import numpy as np
import time
import futures
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
return np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
for i, row in enumerate(x.map(dostuff, xrange(division))):
final_emit[i] = row
如果可行,可以尝试两个调整,其中任何一个都可能更有效。我们并不真正关心返回结果的顺序,但 map
会将它们按顺序排列。这会浪费一些 space 和时间。我认为这不会有太大的不同(大概,您的大部分时间大概都花在了计算上,而不是写出结果),但是如果不分析您的代码,就很难确定。所以,有两种简单的方法可以解决这个问题。
使用 as_completed
让我们可以按照结果的完成顺序使用结果,而不是按照我们对它们进行排队的顺序。像这样:
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
return i, np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
fs = [x.submit(dostuff, i) for i in xrange(division))
for i, row in futures.as_completed(fs):
final_emit[i] = row
或者,我们可以让函数直接插入行,而不是返回它们。这意味着我们现在正在改变来自多个线程的共享对象。所以我认为我们在这里需要一个锁,虽然我不是肯定的(numpy 的规则有点复杂,我还没有彻底阅读你的代码......)。但这可能不会显着影响性能,而且很容易。所以:
import numpy as np
import threading
# etc.
final_emit = np.zeros((division, division, freq_division))
final_emit_lock = threading.Lock()
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
with final_emit_lock:
final_emit[i] = np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
x.map(dostuff, xrange(division))
我所有示例中的 max_workers=8
都应该针对您的机器进行调整。线程太多是不好的,因为它们开始互相争斗而不是并行化;线程太少会更糟,因为您的一些内核只是闲置在那里。
如果您希望它在各种机器上 运行,而不是针对每台机器进行调整,最好的猜测(对于 2.7)通常是:
import multiprocessing
# ...
with futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as x:
但是如果你想从特定机器中获得最大性能,你应该测试不同的值。特别是,对于具有超线程的典型四核笔记本电脑,理想值可以是 4 到 8 之间的任何值,具体取决于您所做的具体工作,并且尝试所有值比尝试预测更容易。
保持代码简单而不是优化
如果您知道要编写什么算法,请编写一个简单的参考实现。从这里你可以使用 Python 两种方式。您可以尝试将代码矢量化 或 您可以编译代码以获得良好的性能。
即使 np.einsum
或 np.add.at
在 Numba 中实现,任何编译器都很难从您的示例中生成高效的二进制代码。
我唯一重写的是一种更有效的标量值数字化方法。
编辑
在 Numba 源代码中,有一种更有效的标量值数字化实现。
代码
#From Numba source
#Copyright (c) 2012, Anaconda, Inc.
#All rights reserved.
@nb.njit(fastmath=True)
def digitize(x, bins, right=False):
# bins are monotonically-increasing
n = len(bins)
lo = 0
hi = n
if right:
if np.isnan(x):
# Find the first nan (i.e. the last from the end of bins,
# since there shouldn't be many of them in practice)
for i in range(n, 0, -1):
if not np.isnan(bins[i - 1]):
return i
return 0
while hi > lo:
mid = (lo + hi) >> 1
if bins[mid] < x:
# mid is too low => narrow to upper bins
lo = mid + 1
else:
# mid is too high, or is a NaN => narrow to lower bins
hi = mid
else:
if np.isnan(x):
# NaNs end up in the last bin
return n
while hi > lo:
mid = (lo + hi) >> 1
if bins[mid] <= x:
# mid is too low => narrow to upper bins
lo = mid + 1
else:
# mid is too high, or is a NaN => narrow to lower bins
hi = mid
return lo
@nb.njit(fastmath=True)
def digitize(value, bins):
if value<bins[0]:
return 0
if value>=bins[bins.shape[0]-1]:
return bins.shape[0]
for l in range(1,bins.shape[0]):
if value>=bins[l-1] and value<bins[l]:
return l
@nb.njit(fastmath=True,parallel=True)
def inner_loop(boost_factor,freq_bins,es):
res=np.zeros((boost_factor.shape[0],freq_bins.shape[0]),dtype=np.float64)
for i in nb.prange(boost_factor.shape[0]):
for j in range(boost_factor.shape[1]):
for k in range(freq_bins.shape[0]):
ind=nb.int64(digitize(boost_factor[i,j]*freq_bins[k],freq_bins))
res[i,ind]+=boost_factor[i,j]*es[j,k]*freq_bins[ind]
return res
@nb.njit(fastmath=True)
def calc_nb(division,freq_division,cd,boost_factor,freq_bins,es):
final_emit = np.empty((division, division, freq_division),np.float64)
for i in range(division):
final_emit[i,:,:]=inner_loop(boost_factor[i],freq_bins,es)
return final_emit
性能
(Quadcore i7)
original_code: 118.5s
calc_nb: 4.14s
#with digitize implementation from Numba source
calc_nb: 2.66s