如何使用 numpy 向量化解决这个问题
How to solve this using numpy vectorization
我有一个非常大的输入 numpy 数组和一本字典。字典规定了 numpy 数组中的值应该更新到什么。我可以使用 for 循环来完成,但它非常耗时,我可以使用 numpy 向量化来解决这个问题吗?
输入:
arr_to_check = numpy.array([['A', 20],['B', 100],['C', 80],['D', 90], ['E', 100]]) # actual length is ~10^8
max_possible = {'A': 25, 'B': 40, 'C': 90, 'D': 50, 'F': 100, 'G': 90} # actual length is ~10^3
扩展结果:
[['A', '20'], # do not change, because 20 < 25 --- max possible for 'A' is 25.
['B', '0'], # change to 0, because 100 > 50 --- max possible for 'B' is 40.
['C', '80'], # do not change, because 80 < 90
['D', '0'], # change to 0, because 90 > 50 --- max possible for 'D' is 50.
['E', '100' ]]
这是循环解决方案:
for i in range(arr_to_check.shape[0]):
row = arr_to_check[i]
if row[0] in max_possible and int(row[1]) > max_possible[row[0]]:
row[1] = 0
这里有一种方法可以完成您的要求(已更新 以简化代码)。
先做几点说明:
- numpy 数组必须是同类类型,所以你在问题中显示的数字将由 numpy 转换为字符串以匹配标签的数据类型(如果 pandas 是一个选项,它可能允许你有数字列 co-exist 和不同的字符串列)。
- 虽然我一直采用结果来匹配原始的同类数据类型(字符串),但您可以提前停止并使用中间的一维数值结果,如果这就是您所需要的。
- 数字类型我用的是
int
,需要的话可以改成float
import numpy
arr_to_check = numpy.array([['A', 20],['B', 100],['C', 80],['D', 90], ['E', 100]])
max_possible = {'A': 25, 'B': 40, 'C': 90, 'D': 50, 'F': 100, 'G': 90}
print('arr_to_check:'); print(arr_to_check)
aT = arr_to_check.T
labels = aT[0,:]
values = aT[1,:].astype(int)
print('labels:'); print(labels)
print('values:'); print(values)
for label, value in max_possible.items():
curMask = (labels == label)
values[curMask] *= (values[curMask] <= value)
print('values:'); print(values)
aT[1,:] = values
arr_to_check = aT.T
print('arr_to_check:'); print(arr_to_check)
输入:
arr_to_check:
[['A' '20']
['B' '100']
['C' '80']
['D' '90']
['E' '100']]
输出:
labels:
['A' 'B' 'C' 'D' 'E']
values:
[ 20 100 80 90 100]
values:
[ 20 0 80 0 100]
arr_to_check:
[['A' '20']
['B' '0']
['C' '80']
['D' '0']
['E' '100']]
解释:
- 转置输入,以便我们可以直接对数值向量 (
values
) 使用向量化运算。
- 遍历
max_possible
中的每个 key/value 对,如果标签行的 max_possible
中的值已被破坏,则使用矢量化公式将 values
乘以 0 (in labels
) 匹配 max_possible
. 中的键
- 使用
values
更新原始的numpy数组。
正如其他人指出的,numpy 数组是同类的,您的输出元素都将具有 str。如果没问题,您可以使用 apply_along_axis
:
t = lambda x: [x[0],0] if x[0] in max_possible and int(x[1]) > max_possible[x[0]] else x
numpy.apply_along_axis(t, 1, arr_to_check)
正如其他人所说,您应该在 numpy 数组中只使用数字。
所以你可以得到这样的数据:
arr_to_check = np.array([[0, 20],[1, 100],[2, 80],[3, 90], [4, 100]])
max_possible = np.array([25, 40, 90, 50, np.inf, 100, 90])
这里我假设 'A': 0, 'B': 1, ...
注意,这样不仅字符串被替换成了数字,dict也被替换成了一个Numpy数组,其中max_possible[i]是i-th字符串的最大值,方便后续操作[=12] =]
现在,你得到你想要的:
m = max_possible.take(arr_to_check.T[0])
m1 = np.array([arr_to_check.T[0], np.minimum(arr_to_check.T[1], m)])
m1.T
第 1 行将每个键的最大值放入 m。
第二行将您的键作为第一行放入 m1,以及您的值的最小值和每个键的最大值
第 3 行转置为您的结果:
数组([[ 0., 20.],
[ 1., 40.],
[ 2., 80.],
[ 3., 50.],
[ 4., 100.]])
运行 您的代码:
In [362]: %%timeit arr = arr_to_check.copy()
...: for i in range(arr.shape[0]):
...: row = arr[i]
...: if row[0] in max_possible and int(row[1]) > max_possible[row[0]]:
...: row[1] = 0
...:
14.1 µs ± 203 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
像这样在数组上迭代比使用列表慢,所以让我们尝试一个纯列表解决方案:
In [372]: alist_to_check = [['A', 20],['B', 100],['C', 80],['D', 90], ['E', 100]]
...: max_possible = {'A': 25, 'B': 40, 'C': 90, 'D': 50, 'F': 100, 'G': 90}
使用带有 if/else 表达式的列表理解:
In [373]: [[k,0] if k in max_possible and v>max_possible[k] else [k,v] for k,v in alist_to_check]
Out[373]: [['A', 20], ['B', 0], ['C', 80], ['D', 0], ['E', 100]]
In [374]: timeit [[k,0] if k in max_possible and v>max_possible[k] else [k,v] for k,v in alist_to_check]
1.45 µs ± 3.2 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)
其中一个答案建议 apply_along_axis
- 键重新定义为整数。我的时间到了
In [366]: timeit np.apply_along_axis(t, 1, arr_to_check)
108 µs ± 2 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
举个小例子,纯列表方法是最快的。对于非常大的情况,我们可能会将其转换为可扩展性更好的 numpy 问题,但我还没有查看这些选项。
使用结构化数组
我们可以将列表变成结构化数组。这保留了 string 和 int dtypes:
In [398]: arr = np.array([tuple(kv) for kv in alist_to_check],'U10,int')
In [399]: arr
Out[399]:
array([('A', 20), ('B', 100), ('C', 80), ('D', 90), ('E', 100)],
dtype=[('f0', '<U10'), ('f1', '<i4')])
In [400]: arr['f0']
Out[400]: array(['A', 'B', 'C', 'D', 'E'], dtype='<U10')
In [401]: arr['f1']
Out[401]: array([ 20, 100, 80, 90, 100])
如果 max_possible
相对于列表较小,迭代其项目并设置结构化数组的相应元素可能是最有效的。例如:
def foo(alist):
arr = np.array([tuple(kv) for kv in alist],'U10,int')
for k,v in max_possible.items():
idx = np.nonzero((arr['f0']==k) & (arr['f1']>v))[0]
arr['f1'][idx] = 0
return arr
In [395]: foo(alist_to_check)
Out[395]:
array([('A', 20), ('B', 0), ('C', 80), ('D', 0), ('E', 100)],
dtype=[('f0', '<U10'), ('f1', '<i4')])
对于这个示例,时间不是很好:
In [397]: timeit foo(alist_to_check)
102 µs ± 360 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
对于一个大列表:
In [403]: biglist = alist_to_check*10000
In [409]: timeit foo(biglist)
44.1 ms ± 209 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [410]: timeit [[k,0] if k in max_possible and v>max_possible[k] else [k,v] for k,v in biglist]
14.8 ms ± 682 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
时间还不够好。然而,其中很大一部分是创建结构化数组:
In [411]: timeit arr = np.array([tuple(kv) for kv in biglist],'U10,int')
38.4 ms ± 49.5 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
如果我们已经有了结构化数组,我希望时代会好得多。
奇怪的是,从 biglist
制作一个纯字符串 dtype 数组需要更长的时间:
In [412]: timeit np.array(biglist)
74.2 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
不过,这确实清楚地表明,使用 dict
和字符串匹配,列表与 numpy
解决方案相比仍然具有竞争力。 numpy
最适合纯数字工作。
我有一个非常大的输入 numpy 数组和一本字典。字典规定了 numpy 数组中的值应该更新到什么。我可以使用 for 循环来完成,但它非常耗时,我可以使用 numpy 向量化来解决这个问题吗?
输入:
arr_to_check = numpy.array([['A', 20],['B', 100],['C', 80],['D', 90], ['E', 100]]) # actual length is ~10^8
max_possible = {'A': 25, 'B': 40, 'C': 90, 'D': 50, 'F': 100, 'G': 90} # actual length is ~10^3
扩展结果:
[['A', '20'], # do not change, because 20 < 25 --- max possible for 'A' is 25.
['B', '0'], # change to 0, because 100 > 50 --- max possible for 'B' is 40.
['C', '80'], # do not change, because 80 < 90
['D', '0'], # change to 0, because 90 > 50 --- max possible for 'D' is 50.
['E', '100' ]]
这是循环解决方案:
for i in range(arr_to_check.shape[0]):
row = arr_to_check[i]
if row[0] in max_possible and int(row[1]) > max_possible[row[0]]:
row[1] = 0
这里有一种方法可以完成您的要求(已更新 以简化代码)。
先做几点说明:
- numpy 数组必须是同类类型,所以你在问题中显示的数字将由 numpy 转换为字符串以匹配标签的数据类型(如果 pandas 是一个选项,它可能允许你有数字列 co-exist 和不同的字符串列)。
- 虽然我一直采用结果来匹配原始的同类数据类型(字符串),但您可以提前停止并使用中间的一维数值结果,如果这就是您所需要的。
- 数字类型我用的是
int
,需要的话可以改成float
import numpy
arr_to_check = numpy.array([['A', 20],['B', 100],['C', 80],['D', 90], ['E', 100]])
max_possible = {'A': 25, 'B': 40, 'C': 90, 'D': 50, 'F': 100, 'G': 90}
print('arr_to_check:'); print(arr_to_check)
aT = arr_to_check.T
labels = aT[0,:]
values = aT[1,:].astype(int)
print('labels:'); print(labels)
print('values:'); print(values)
for label, value in max_possible.items():
curMask = (labels == label)
values[curMask] *= (values[curMask] <= value)
print('values:'); print(values)
aT[1,:] = values
arr_to_check = aT.T
print('arr_to_check:'); print(arr_to_check)
输入:
arr_to_check:
[['A' '20']
['B' '100']
['C' '80']
['D' '90']
['E' '100']]
输出:
labels:
['A' 'B' 'C' 'D' 'E']
values:
[ 20 100 80 90 100]
values:
[ 20 0 80 0 100]
arr_to_check:
[['A' '20']
['B' '0']
['C' '80']
['D' '0']
['E' '100']]
解释:
- 转置输入,以便我们可以直接对数值向量 (
values
) 使用向量化运算。 - 遍历
max_possible
中的每个 key/value 对,如果标签行的max_possible
中的值已被破坏,则使用矢量化公式将values
乘以 0 (inlabels
) 匹配max_possible
. 中的键
- 使用
values
更新原始的numpy数组。
正如其他人指出的,numpy 数组是同类的,您的输出元素都将具有 str。如果没问题,您可以使用 apply_along_axis
:
t = lambda x: [x[0],0] if x[0] in max_possible and int(x[1]) > max_possible[x[0]] else x
numpy.apply_along_axis(t, 1, arr_to_check)
正如其他人所说,您应该在 numpy 数组中只使用数字。 所以你可以得到这样的数据:
arr_to_check = np.array([[0, 20],[1, 100],[2, 80],[3, 90], [4, 100]])
max_possible = np.array([25, 40, 90, 50, np.inf, 100, 90])
这里我假设 'A': 0, 'B': 1, ... 注意,这样不仅字符串被替换成了数字,dict也被替换成了一个Numpy数组,其中max_possible[i]是i-th字符串的最大值,方便后续操作[=12] =]
现在,你得到你想要的:
m = max_possible.take(arr_to_check.T[0])
m1 = np.array([arr_to_check.T[0], np.minimum(arr_to_check.T[1], m)])
m1.T
第 1 行将每个键的最大值放入 m。
第二行将您的键作为第一行放入 m1,以及您的值的最小值和每个键的最大值
第 3 行转置为您的结果:
数组([[ 0., 20.], [ 1., 40.], [ 2., 80.], [ 3., 50.], [ 4., 100.]])
运行 您的代码:
In [362]: %%timeit arr = arr_to_check.copy()
...: for i in range(arr.shape[0]):
...: row = arr[i]
...: if row[0] in max_possible and int(row[1]) > max_possible[row[0]]:
...: row[1] = 0
...:
14.1 µs ± 203 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
像这样在数组上迭代比使用列表慢,所以让我们尝试一个纯列表解决方案:
In [372]: alist_to_check = [['A', 20],['B', 100],['C', 80],['D', 90], ['E', 100]]
...: max_possible = {'A': 25, 'B': 40, 'C': 90, 'D': 50, 'F': 100, 'G': 90}
使用带有 if/else 表达式的列表理解:
In [373]: [[k,0] if k in max_possible and v>max_possible[k] else [k,v] for k,v in alist_to_check]
Out[373]: [['A', 20], ['B', 0], ['C', 80], ['D', 0], ['E', 100]]
In [374]: timeit [[k,0] if k in max_possible and v>max_possible[k] else [k,v] for k,v in alist_to_check]
1.45 µs ± 3.2 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)
其中一个答案建议 apply_along_axis
- 键重新定义为整数。我的时间到了
In [366]: timeit np.apply_along_axis(t, 1, arr_to_check)
108 µs ± 2 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
举个小例子,纯列表方法是最快的。对于非常大的情况,我们可能会将其转换为可扩展性更好的 numpy 问题,但我还没有查看这些选项。
使用结构化数组
我们可以将列表变成结构化数组。这保留了 string 和 int dtypes:
In [398]: arr = np.array([tuple(kv) for kv in alist_to_check],'U10,int')
In [399]: arr
Out[399]:
array([('A', 20), ('B', 100), ('C', 80), ('D', 90), ('E', 100)],
dtype=[('f0', '<U10'), ('f1', '<i4')])
In [400]: arr['f0']
Out[400]: array(['A', 'B', 'C', 'D', 'E'], dtype='<U10')
In [401]: arr['f1']
Out[401]: array([ 20, 100, 80, 90, 100])
如果 max_possible
相对于列表较小,迭代其项目并设置结构化数组的相应元素可能是最有效的。例如:
def foo(alist):
arr = np.array([tuple(kv) for kv in alist],'U10,int')
for k,v in max_possible.items():
idx = np.nonzero((arr['f0']==k) & (arr['f1']>v))[0]
arr['f1'][idx] = 0
return arr
In [395]: foo(alist_to_check)
Out[395]:
array([('A', 20), ('B', 0), ('C', 80), ('D', 0), ('E', 100)],
dtype=[('f0', '<U10'), ('f1', '<i4')])
对于这个示例,时间不是很好:
In [397]: timeit foo(alist_to_check)
102 µs ± 360 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
对于一个大列表:
In [403]: biglist = alist_to_check*10000
In [409]: timeit foo(biglist)
44.1 ms ± 209 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [410]: timeit [[k,0] if k in max_possible and v>max_possible[k] else [k,v] for k,v in biglist]
14.8 ms ± 682 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
时间还不够好。然而,其中很大一部分是创建结构化数组:
In [411]: timeit arr = np.array([tuple(kv) for kv in biglist],'U10,int')
38.4 ms ± 49.5 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
如果我们已经有了结构化数组,我希望时代会好得多。
奇怪的是,从 biglist
制作一个纯字符串 dtype 数组需要更长的时间:
In [412]: timeit np.array(biglist)
74.2 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
不过,这确实清楚地表明,使用 dict
和字符串匹配,列表与 numpy
解决方案相比仍然具有竞争力。 numpy
最适合纯数字工作。