替换 numpy 数组中的元素避免循环
Replace elements in numpy array avoiding loops
我有一个相当大的 1d numpy 数组 Xold 给定值。这些值应为
根据 2d numpy 数组 Y 指定的规则替换:
一个例子是
Xold=np.array([0,1,2,3,4])
Y=np.array([[0,0],[1,100],[3,300],[4,400],[2,200]])
每当 Xold 中的值与 Y[:,0] 中的值相同时,Xnew 中的新值应该是 Y[:,1] 中的对应值。这是通过两个嵌套的 for 循环完成的:
Xnew=np.zeros(len(Xold))
for i in range(len(Xold)):
for j in range(len(Y)):
if Xold[i]==Y[j,0]:
Xnew[i]=Y[j,1]
对于给定的示例,这会产生 Xnew=[0,100,200,300,400]
。
但是,对于大型数据集,此过程非常慢。完成此任务的更快、更优雅的方法是什么?
您可以做的第一个改进是使用 numpy 索引,但您仍然有 1 个循环:
for old, new in Y:
Xold[Xold == old] = new
这是一种可能性:
import numpy as np
Xold = np.array([0, 1, 2, 3, 4])
Y = np.array([[0, 0], [1, 100], [3, 300], [4, 400], [2, 200]])
# Check every X value against every Y first value
m = Xold == Y[:, 0, np.newaxis]
# Check which elements in X are among Y first values
# (so values that are not in Y are not replaced)
m_X = np.any(m, axis=0)
# Compute replacement
# Xold * (1 - m_X) are the non-replaced values
# np.sum(Y[:, 1, np.newaxis] * m, axis=0) * m_X are the replaced values
Xnew = Xold * (1 - m_X) + np.sum(Y[:, 1, np.newaxis] * m, axis=0) * m_X
print(Xnew)
输出:
[ 0 100 200 300 400]
此方法或多或少适用于每种情况(未排序的数组、X 中值的多次重复、X 中的值未替换、Y 中的值未替换 X 中的任何内容),除非您为相同的内容提供两次替换Y 中的值,无论如何这都是错误的。但是,它的时间和 space 复杂度是 X 和 Y 大小的乘积。如果您的问题有额外的限制(数据已排序,没有重复等),则可能会做得更好。例如,如果 X 的排序没有重复元素,并且 Y 中的每个值都替换 X 中的一个值(如您的示例),这可能会更快:
import numpy as np
Xold = np.array([0, 1, 2, 3, 4])
Y = np.array([[0, 0], [1, 100], [3, 300], [4, 400], [2, 200]])
idx = np.searchsorted(Xold, Y[:, 0])
Xnew = Xold.copy()
Xnew[idx] = Y[:, 1]
print(Xnew)
# [ 0 100 200 300 400]
当 Y
的第一列中的数据不一定排序时,我们可以使用 np.searchsorted
作为一般情况 -
sidx = Y[:,0].argsort()
out = Y[sidx[np.searchsorted(Y[:,0], Xold, sorter=sidx)],1]
样本运行-
In [53]: Xold
Out[53]: array([14, 10, 12, 13, 11])
In [54]: Y
Out[54]:
array([[ 10, 0],
[ 11, 100],
[ 13, 300],
[ 14, 400],
[ 12, 200]])
In [55]: sidx = Y[:,0].argsort()
...: out = Y[sidx[np.searchsorted(Y[:,0], Xold, sorter=sidx)],1]
In [56]: out
Out[56]: array([400, 0, 200, 300, 100])
如果不是所有的元素都有相应的可用映射,那么我们需要做更多的工作,像这样-
sidx = Y[:,0].argsort()
sorted_indx = np.searchsorted(Y[:,0], Xold, sorter=sidx)
sorted_indx[sorted_indx==len(sidx)] = len(sidx)-1
idx_out = sidx[sorted_indx]
out = Y[idx_out,1]
out[Y[idx_out,0]!=Xold] = 0 # NA values as 0s
您可以将 slicing
特征与 argsort
方法结合使用。
Xnew = Y[Y[:,1].argsort()][:, 1][Xold]
输出
array([ 0, 100, 200, 300, 400])
pd.Series.map()
的解决方案
如果您愿意使用 Pandas 库,您也可以使用 .map()
:
以矢量化方式执行此操作
>>> import pandas as pd
>>> pd.Series(Xold).map(pd.Series(Y[:, 1], index=Y[:, 0]))
0 0
1 100
2 200
3 300
4 400
dtype: int64
>>> pd.Series(Xold).map(pd.Series(Y[:, 1], index=Y[:, 0])).values
array([ 0, 100, 200, 300, 400])
对于签名 a.map(b)
,a
在 b
的索引中查找其对应的条目,并映射到 b
中的相应值。
b
这里是 pd.Series(Y[:, 1], index=Y[:, 0])
,它使用第 0 列作为索引,第 1 列作为映射到的值。
直接使用pandas.core.algorithms
Under the hood,这将使用 .get_indexer()
和 Cython 实现的 take_1d()
:
indexer = mapper.index.get_indexer(values)
new_values = algorithms.take_1d(mapper._values, indexer)
知道了,如果数组真的很大,您可以像这样减少一些开销:
from pandas.core import algorithms
indexer = pd.Index(Y[:, 0]).get_indexer(Xold)
mapped = algorithms.take_1d(Y[:, 1], indexer)
numpy_indexed 包(免责声明;我是它的作者)包含一个有效的向量化函数,可以解决一般问题:
import numpy_indexed as npi
Xnew = npi.remap(Xold, keys=Y[:, 0], values=Y[:, 1])
也就是说,这适用于任何 dtype,或者当要替换的键和值本身是 ndarray 时,您会得到一个 kwarg 来指定如何对缺失的元素做出反应。
不确定它与 pandas 性能相比如何;但是这个库中的一个设计选择是执行像这样的基本操作(或进行分组等)不应该涉及创建一个全新的数据类型,如 Series 或 Table,这总是困扰我使用pandas 对于这类事情。
您可以使用 y = dict(Y)
将 Y 转换为字典,然后 运行 以下列表理解
[y[i] if i in y.keys() else i for i in Xold]
选择最快的方法
这个问题的答案提供了一系列很好的方法来替换 numpy 数组中的元素。让我们检查一下,哪个最快。
TL;DR: Numpy 索引是赢家
def meth1(): # suggested by @Slam
for old, new in Y:
Xold[Xold == old] = new
def meth2(): # suggested by myself, convert y_dict = dict(Y) first
[y_dict[i] if i in y_dict.keys() else i for i in Xold]
def meth3(): # suggested by @Eelco Hoogendoom, import numpy_index as npi first
npi.remap(Xold, keys=Y[:, 0], values=Y[:, 1])
def meth4(): # suggested by @Brad Solomon, import pandas as pd first
pd.Series(Xold).map(pd.Series(Y[:, 1], index=Y[:, 0])).values
# suggested by @jdehesa. create Xnew = Xold.copy() and index
# idx = np.searchsorted(Xold, Y[:, 0]) first
def meth5():
Xnew[idx] = Y[:, 1]
结果并不令人意外
In [39]: timeit.timeit(meth1, number=1000000)
Out[39]: 12.08
In [40]: timeit.timeit(meth2, number=1000000)
Out[40]: 2.87
In [38]: timeit.timeit(meth3, number=1000000)
Out[38]: 55.39
In [12]: timeit.timeit(meth4, number=1000000)
Out[12]: 256.84
In [50]: timeit.timeit(meth5, number=1000000)
Out[50]: 1.12
所以,好的旧列表理解是第二快的,获胜的方法是 numpy 索引结合 searchsorted()
。
我有一个相当大的 1d numpy 数组 Xold 给定值。这些值应为 根据 2d numpy 数组 Y 指定的规则替换: 一个例子是
Xold=np.array([0,1,2,3,4])
Y=np.array([[0,0],[1,100],[3,300],[4,400],[2,200]])
每当 Xold 中的值与 Y[:,0] 中的值相同时,Xnew 中的新值应该是 Y[:,1] 中的对应值。这是通过两个嵌套的 for 循环完成的:
Xnew=np.zeros(len(Xold))
for i in range(len(Xold)):
for j in range(len(Y)):
if Xold[i]==Y[j,0]:
Xnew[i]=Y[j,1]
对于给定的示例,这会产生 Xnew=[0,100,200,300,400]
。
但是,对于大型数据集,此过程非常慢。完成此任务的更快、更优雅的方法是什么?
您可以做的第一个改进是使用 numpy 索引,但您仍然有 1 个循环:
for old, new in Y:
Xold[Xold == old] = new
这是一种可能性:
import numpy as np
Xold = np.array([0, 1, 2, 3, 4])
Y = np.array([[0, 0], [1, 100], [3, 300], [4, 400], [2, 200]])
# Check every X value against every Y first value
m = Xold == Y[:, 0, np.newaxis]
# Check which elements in X are among Y first values
# (so values that are not in Y are not replaced)
m_X = np.any(m, axis=0)
# Compute replacement
# Xold * (1 - m_X) are the non-replaced values
# np.sum(Y[:, 1, np.newaxis] * m, axis=0) * m_X are the replaced values
Xnew = Xold * (1 - m_X) + np.sum(Y[:, 1, np.newaxis] * m, axis=0) * m_X
print(Xnew)
输出:
[ 0 100 200 300 400]
此方法或多或少适用于每种情况(未排序的数组、X 中值的多次重复、X 中的值未替换、Y 中的值未替换 X 中的任何内容),除非您为相同的内容提供两次替换Y 中的值,无论如何这都是错误的。但是,它的时间和 space 复杂度是 X 和 Y 大小的乘积。如果您的问题有额外的限制(数据已排序,没有重复等),则可能会做得更好。例如,如果 X 的排序没有重复元素,并且 Y 中的每个值都替换 X 中的一个值(如您的示例),这可能会更快:
import numpy as np
Xold = np.array([0, 1, 2, 3, 4])
Y = np.array([[0, 0], [1, 100], [3, 300], [4, 400], [2, 200]])
idx = np.searchsorted(Xold, Y[:, 0])
Xnew = Xold.copy()
Xnew[idx] = Y[:, 1]
print(Xnew)
# [ 0 100 200 300 400]
当 Y
的第一列中的数据不一定排序时,我们可以使用 np.searchsorted
作为一般情况 -
sidx = Y[:,0].argsort()
out = Y[sidx[np.searchsorted(Y[:,0], Xold, sorter=sidx)],1]
样本运行-
In [53]: Xold
Out[53]: array([14, 10, 12, 13, 11])
In [54]: Y
Out[54]:
array([[ 10, 0],
[ 11, 100],
[ 13, 300],
[ 14, 400],
[ 12, 200]])
In [55]: sidx = Y[:,0].argsort()
...: out = Y[sidx[np.searchsorted(Y[:,0], Xold, sorter=sidx)],1]
In [56]: out
Out[56]: array([400, 0, 200, 300, 100])
如果不是所有的元素都有相应的可用映射,那么我们需要做更多的工作,像这样-
sidx = Y[:,0].argsort()
sorted_indx = np.searchsorted(Y[:,0], Xold, sorter=sidx)
sorted_indx[sorted_indx==len(sidx)] = len(sidx)-1
idx_out = sidx[sorted_indx]
out = Y[idx_out,1]
out[Y[idx_out,0]!=Xold] = 0 # NA values as 0s
您可以将 slicing
特征与 argsort
方法结合使用。
Xnew = Y[Y[:,1].argsort()][:, 1][Xold]
输出
array([ 0, 100, 200, 300, 400])
pd.Series.map()
的解决方案
如果您愿意使用 Pandas 库,您也可以使用 .map()
:
>>> import pandas as pd
>>> pd.Series(Xold).map(pd.Series(Y[:, 1], index=Y[:, 0]))
0 0
1 100
2 200
3 300
4 400
dtype: int64
>>> pd.Series(Xold).map(pd.Series(Y[:, 1], index=Y[:, 0])).values
array([ 0, 100, 200, 300, 400])
对于签名 a.map(b)
,a
在 b
的索引中查找其对应的条目,并映射到 b
中的相应值。
b
这里是 pd.Series(Y[:, 1], index=Y[:, 0])
,它使用第 0 列作为索引,第 1 列作为映射到的值。
直接使用pandas.core.algorithms
Under the hood,这将使用 .get_indexer()
和 Cython 实现的 take_1d()
:
indexer = mapper.index.get_indexer(values)
new_values = algorithms.take_1d(mapper._values, indexer)
知道了,如果数组真的很大,您可以像这样减少一些开销:
from pandas.core import algorithms
indexer = pd.Index(Y[:, 0]).get_indexer(Xold)
mapped = algorithms.take_1d(Y[:, 1], indexer)
numpy_indexed 包(免责声明;我是它的作者)包含一个有效的向量化函数,可以解决一般问题:
import numpy_indexed as npi
Xnew = npi.remap(Xold, keys=Y[:, 0], values=Y[:, 1])
也就是说,这适用于任何 dtype,或者当要替换的键和值本身是 ndarray 时,您会得到一个 kwarg 来指定如何对缺失的元素做出反应。
不确定它与 pandas 性能相比如何;但是这个库中的一个设计选择是执行像这样的基本操作(或进行分组等)不应该涉及创建一个全新的数据类型,如 Series 或 Table,这总是困扰我使用pandas 对于这类事情。
您可以使用 y = dict(Y)
将 Y 转换为字典,然后 运行 以下列表理解
[y[i] if i in y.keys() else i for i in Xold]
选择最快的方法
这个问题的答案提供了一系列很好的方法来替换 numpy 数组中的元素。让我们检查一下,哪个最快。
TL;DR: Numpy 索引是赢家
def meth1(): # suggested by @Slam
for old, new in Y:
Xold[Xold == old] = new
def meth2(): # suggested by myself, convert y_dict = dict(Y) first
[y_dict[i] if i in y_dict.keys() else i for i in Xold]
def meth3(): # suggested by @Eelco Hoogendoom, import numpy_index as npi first
npi.remap(Xold, keys=Y[:, 0], values=Y[:, 1])
def meth4(): # suggested by @Brad Solomon, import pandas as pd first
pd.Series(Xold).map(pd.Series(Y[:, 1], index=Y[:, 0])).values
# suggested by @jdehesa. create Xnew = Xold.copy() and index
# idx = np.searchsorted(Xold, Y[:, 0]) first
def meth5():
Xnew[idx] = Y[:, 1]
结果并不令人意外
In [39]: timeit.timeit(meth1, number=1000000)
Out[39]: 12.08
In [40]: timeit.timeit(meth2, number=1000000)
Out[40]: 2.87
In [38]: timeit.timeit(meth3, number=1000000)
Out[38]: 55.39
In [12]: timeit.timeit(meth4, number=1000000)
Out[12]: 256.84
In [50]: timeit.timeit(meth5, number=1000000)
Out[50]: 1.12
所以,好的旧列表理解是第二快的,获胜的方法是 numpy 索引结合 searchsorted()
。