如何使用 numpy 向量化解决这个问题

How to solve this using numpy vectorization

我有一个非常大的输入 numpy 数组和一本字典。字典规定了 numpy 数组中的值应该更新到什么。我可以使用 for 循环来完成,但它非常耗时,我可以使用 numpy 向量化来解决这个问题吗?

输入:

arr_to_check = numpy.array([['A', 20],['B', 100],['C', 80],['D', 90], ['E', 100]]) # actual length is ~10^8
max_possible = {'A': 25, 'B': 40, 'C': 90, 'D': 50, 'F': 100, 'G': 90} # actual length is ~10^3

扩展结果:

[['A', '20'], # do not change, because 20 < 25 --- max possible for 'A' is 25.
['B', '0'], # change to 0, because 100 > 50 --- max possible for 'B' is 40.
['C', '80'], # do not change, because 80 < 90
['D', '0'], # change to 0, because 90 > 50 --- max possible for 'D' is 50.
['E', '100' ]] 

这是循环解决方案:

for i in range(arr_to_check.shape[0]):
    row = arr_to_check[i]
    if row[0] in max_possible and int(row[1]) > max_possible[row[0]]:
        row[1] = 0

这里有一种方法可以完成您的要求(已更新 以简化代码)。

先做几点说明:

  • numpy 数组必须是同类类型,所以你在问题中显示的数字将由 numpy 转换为字符串以匹配标签的数据类型(如果 pandas 是一个选项,它可能允许你有数字列 co-exist 和不同的字符串列)。
  • 虽然我一直采用结果来匹配原始的同类数据类型(字符串),但您可以提前停止并使用中间的一维数值结果,如果这就是您所需要的。
  • 数字类型我用的是int,需要的话可以改成float
import numpy
arr_to_check = numpy.array([['A', 20],['B', 100],['C', 80],['D', 90], ['E', 100]])
max_possible = {'A': 25, 'B': 40, 'C': 90, 'D': 50, 'F': 100, 'G': 90}
print('arr_to_check:'); print(arr_to_check)

aT = arr_to_check.T
labels = aT[0,:]
values = aT[1,:].astype(int)
print('labels:'); print(labels)
print('values:'); print(values)

for label, value in max_possible.items():
    curMask = (labels == label)
    values[curMask] *= (values[curMask] <= value)
print('values:'); print(values)

aT[1,:] = values
arr_to_check = aT.T
print('arr_to_check:'); print(arr_to_check)

输入:

arr_to_check:
[['A' '20']
 ['B' '100']
 ['C' '80']
 ['D' '90']
 ['E' '100']]

输出:

labels:
['A' 'B' 'C' 'D' 'E']
values:
[ 20 100  80  90 100]
values:
[ 20   0  80   0 100]
arr_to_check:
[['A' '20']
 ['B' '0']
 ['C' '80']
 ['D' '0']
 ['E' '100']]

解释:

  • 转置输入,以便我们可以直接对数值向量 (values) 使用向量化运算。
  • 遍历 max_possible 中的每个 key/value 对,如果标签行的 max_possible 中的值已被破坏,则使用矢量化公式将 values 乘以 0 (in labels) 匹配 max_possible.
  • 中的键
  • 使用values更新原始的numpy数组。

正如其他人指出的,numpy 数组是同类的,您的输出元素都将具有 str。如果没问题,您可以使用 apply_along_axis:

t = lambda x: [x[0],0] if  x[0] in max_possible and int(x[1]) > max_possible[x[0]] else x
numpy.apply_along_axis(t, 1, arr_to_check)

正如其他人所说,您应该在 numpy 数组中只使用数字。 所以你可以得到这样的数据:

arr_to_check = np.array([[0, 20],[1, 100],[2, 80],[3, 90], [4, 100]])
max_possible = np.array([25, 40, 90, 50, np.inf, 100, 90])

这里我假设 'A': 0, 'B': 1, ... 注意,这样不仅字符串被替换成了数字,dict也被替换成了一个Numpy数组,其中max_possible[i]是i-th字符串的最大值,方便后续操作[=12] =]

现在,你得到你想要的:

m = max_possible.take(arr_to_check.T[0]) 
m1 = np.array([arr_to_check.T[0], np.minimum(arr_to_check.T[1], m)]) 
m1.T
  • 第 1 行将每个键的最大值放入 m。

  • 第二行将您的键作为第一行放入 m1,以及您的值的最小值和每个键的最大值

  • 第 3 行转置为您的结果:

    数组([[ 0., 20.], [ 1., 40.], [ 2., 80.], [ 3., 50.], [ 4., 100.]])

运行 您的代码:

In [362]: %%timeit arr = arr_to_check.copy()
     ...: for i in range(arr.shape[0]):
     ...:     row = arr[i]
     ...:     if row[0] in max_possible and int(row[1]) > max_possible[row[0]]:
     ...:         row[1] = 0
     ...:         
14.1 µs ± 203 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)

像这样在数组上迭代比使用列表慢,所以让我们尝试一个纯列表解决方案:

In [372]: alist_to_check = [['A', 20],['B', 100],['C', 80],['D', 90], ['E', 100]]
     ...: max_possible = {'A': 25, 'B': 40, 'C': 90, 'D': 50, 'F': 100, 'G': 90}

使用带有 if/else 表达式的列表理解:

In [373]: [[k,0] if k in max_possible and v>max_possible[k] else [k,v] for k,v in alist_to_check]
Out[373]: [['A', 20], ['B', 0], ['C', 80], ['D', 0], ['E', 100]]

In [374]: timeit [[k,0] if k in max_possible and v>max_possible[k] else [k,v] for k,v in alist_to_check]
1.45 µs ± 3.2 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)

其中一个答案建议 apply_along_axis - 键重新定义为整数。我的时间到了

In [366]: timeit np.apply_along_axis(t, 1, arr_to_check)
108 µs ± 2 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

举个小例子,纯列表方法是最快的。对于非常大的情况,我们可能会将其转换为可扩展性更好的 numpy 问题,但我还没有查看这些选项。

使用结构化数组

我们可以将列表变成结构化数组。这保留了 string 和 int dtypes:

In [398]: arr = np.array([tuple(kv) for kv in alist_to_check],'U10,int')

In [399]: arr
Out[399]: 
array([('A',  20), ('B', 100), ('C',  80), ('D',  90), ('E', 100)],
      dtype=[('f0', '<U10'), ('f1', '<i4')])

In [400]: arr['f0']
Out[400]: array(['A', 'B', 'C', 'D', 'E'], dtype='<U10')

In [401]: arr['f1']
Out[401]: array([ 20, 100,  80,  90, 100])

如果 max_possible 相对于列表较小,迭代其项目并设置结构化数组的相应元素可能是最有效的。例如:

def foo(alist):
    arr = np.array([tuple(kv) for kv in alist],'U10,int')
    for k,v in max_possible.items():
        idx = np.nonzero((arr['f0']==k) & (arr['f1']>v))[0]
        arr['f1'][idx] = 0
    return arr

In [395]: foo(alist_to_check)
Out[395]: 
array([('A',  20), ('B',   0), ('C',  80), ('D',   0), ('E', 100)],
      dtype=[('f0', '<U10'), ('f1', '<i4')])

对于这个示例,时间不是很好:

In [397]: timeit foo(alist_to_check)
102 µs ± 360 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

对于一个大列表:

In [403]: biglist = alist_to_check*10000

In [409]: timeit foo(biglist)
44.1 ms ± 209 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [410]: timeit [[k,0] if k in max_possible and v>max_possible[k] else [k,v] for k,v in biglist]
14.8 ms ± 682 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

时间还不够好。然而,其中很大一部分是创建结构化数组:

In [411]: timeit arr = np.array([tuple(kv) for kv in biglist],'U10,int')
38.4 ms ± 49.5 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

如果我们已经有了结构化数组,我希望时代会好得多。

奇怪的是,从 biglist 制作一个纯字符串 dtype 数组需要更长的时间:

In [412]: timeit np.array(biglist)
74.2 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

不过,这确实清楚地表明,使用 dict 和字符串匹配,列表与 numpy 解决方案相比仍然具有竞争力。 numpy 最适合纯数字工作。