Numpy 查找二维数组中公共点的索引
Numpy Find Indices of Common Points in 2D Array
系统
OS: Windows 10 (x64),内部版本 1909
Python版本:3.8.10
Numpy 版本:1.21.2
问题
给定两个 2D (N, 3)
Numpy 数组 (x, y, z)
浮点数据点,Pythonic(矢量化)方法是什么来找到一个数组中点的索引等于另一个数组中的点?
(注意: 我的问题不同之处在于我需要这个来处理真实世界的数据集,其中两个数据集可能因浮点错误而不同. 详情请往下看)
历史
非常相似的问题已经被问过很多次了:
- test for membership in a 2d numpy array
- Get indices of intersecting rows of Numpy 2d Array
- Find indices of rows of numpy 2d array in another 2D array
- Indices of intersecting rows of Numpy 2d Array
之前的尝试
提供了一个工作列表理解解决方案,但我正在寻找一个可以很好地扩展到大型数据集(即数百万个点)的解决方案:
代码:
import numpy as np
if __name__ == "__main__":
big_array = np.array(
[
[1.0, 2.0, 1.2],
[5.0, 3.0, 0.12],
[-1.0, 14.0, 0.0],
[-9.0, 0.0, 13.0],
]
)
small_array = np.array(
[
[5.0, 3.0, 0.12],
[-9.0, 0.0, 13.0],
]
)
inds = [
ndx
for ndx, barr in enumerate(big_array)
for sarr in small_array
if all(sarr == barr)
]
print(inds)
输出:
[1, 2]
尝试 SO Post 3 (similar to SO Post 2) 的解决方案,但使用浮点数不起作用(我怀疑需要使用 np.isclose
的东西):
代码3:
import numpy as np
if __name__ == "__main__":
big_array = np.array(
[
[1.0, 2.0, 1.2],
[5.0, 3.0, 0.12],
[-1.0, 14.0, 0.0],
[-9.0, 0.0, 13.0],
],
dtype=float,
)
small_array = np.array(
[
[5.0, 3.0, 0.12],
[-9.0, 0.0, 13.0],
],
dtype=float,
)
inds = np.nonzero(
np.in1d(big_array.view("f,f").reshape(-1), small_array.view("f,f").reshape(-1))
)[0]
print(inds)
输出3:
[ 3 4 5 8 9 10 11]
我的尝试
我已经尝试 numpy.isin
与 np.all
和 np.argwhere
inds = np.argwhere(np.all(np.isin(big_array, small_array), axis=1)).reshape(-1)
有效(并且,我认为,更具可读性和可理解性;即 pythonic),但不适用于包含浮点错误的真实数据集:
import numpy as np
if __name__ == "__main__":
big_array = np.array(
[
[1.0, 2.0, 1.2],
[5.0, 3.0, 0.12],
[-1.0, 14.0, 0.0],
[-9.0, 0.0, 13.0],
],
dtype=float,
)
small_array = np.array(
[
[5.0, 3.0, 0.12],
[-9.0, 0.0, 13.0],
],
dtype=float,
)
small_array_fpe = np.array(
[
[5.0 + 1e-9, 3.0 + 1e-9, 0.12 + 1e-9],
[-9.0 + 1e-9, 0.0 + 1e-9, 13.0 + 1e-9],
],
dtype=float,
)
inds_no_fpe = np.argwhere(np.all(np.isin(big_array, small_array), axis=1)).reshape(-1)
inds_with_fpe = np.argwhere(
np.all(np.isin(big_array, small_array_fpe), axis=1)
).reshape(-1)
print(f"No Floating Point Error: {inds_no_fpe}")
print(f"With Floating Point Error: {inds_with_fpe}")
print(f"Are 5.0 and 5.0+1e-9 close?: {np.isclose(5.0, 5.0 + 1e-9)}")
输出:
No Floating Point Error: [1 3]
With Floating Point Error: []
Are 5.0 and 5.0+1e-9 close?: True
如何通过合并 np.isclose
使我的上述解决方案(在有浮点错误的数据集上)起作用?欢迎使用其他解决方案。
注意: 因为 small_array
是 big_array
的子集,直接使用 np.isclose
是行不通的,因为形状不会广播:
np.isclose(big_array, small_array_fpe)
产量
ValueError: operands could not be broadcast together with shapes (4,3) (2,3)
更新
目前,我唯一可行的解决方案是
inds_with_fpe = [
ndx
for ndx, barr in enumerate(big_array)
for sarr in small_array_fpe
if np.all(np.isclose(sarr, barr))
]
感谢@AndrasDeak 的回答
以下代码片段
inds_with_fpe = np.argwhere(
np.isclose(small_array_fpe, big_array[:, None, :]).all(-1).any(-1)
).reshape(-1)
将使代码工作。现在对应的输出是:
No Floating Point Error: [1 3]
With Floating Point Error: [1, 3]
Are 5.0 and 5.0+1e-9 close?: True
上面的 None
创建了一个新轴(与 np.newaxis
相同)。这会将 big_array
数组的形状更改为 (4, 1, 3)
,它遵守广播规则并允许 np.isclose
到 运行。也就是说,big_array
现在是一组4
1 x 3
点,并且由于big_array
中的一个轴是1,所以small_array_fpe
可以广播到2
1 x 3
数组(即形状 (2, 1, 3)
)和元素可以按元素进行比较。
结果是一个(4, 2, 3)
布尔数组; big_array
的每个元素都与 small_array_fpe
的每个元素进行元素比较,并且它们接近(在特定公差范围内)的组件是 returned。由于 all
是作为对象方法而不是 numpy 函数调用的,因此函数的第一个参数实际上是 axis
而不是输入数组。因此,上述函数中的 -1
表示“数组的最后一个轴”。
我们首先 return (4, 2, 3)
数组的索引都是 True
(即所有 (x, y, z)
分量都相等),这会产生 4 x 2
大批。其中任何一个 True
是 big_array
中点相等的相应索引,产生一个 4 x 1
数组。
argwhere
returns 索引按元素分组,因此其形状通常为 (number nonzero items, num dims of input array)
,因此我们将其展平为 1d
数组 reshape(-1)
.
不幸的是,这需要二次方的内存量 w.r.t。每个数组中的点数,因为我们必须 运行 遍历 big_array
的每个元素并对照 small_array_fpe
的每个元素检查它。比如在一组另外10000个点中搜索10000个点,对于32位浮点数据,需要
Memory = 10000 * 10000 * 4 * 8 = 32 GiB RAM!
如果有人能设计出一个具有更快 运行 时间和合理内存量的解决方案,那就太棒了!
仅供参考:
from timeit import timeit
import numpy as np
big_array = np.array(
[
[1.0, 2.0, 1.2],
[5.0, 3.0, 0.12],
[-1.0, 14.0, 0.0],
[-9.0, 0.0, 13.0],
],
dtype=float,
)
small_array = np.array(
[
[5.0 + 1e-9, 3.0 + 1e-9, 0.12 + 1e-9],
[10.0, 2.0, 5.8],
[-9.0 + 1e-9, 0.0 + 1e-9, 13.0 + 1e-9],
],
dtype=float,
)
def approach01():
return [
ndx
for ndx, barr in enumerate(big_array)
for sarr in small_array
if np.all(np.isclose(sarr, barr))
]
def approach02():
return np.argwhere(
np.isclose(small_array, big_array[:, None, :]).all(-1).any(-1)
).reshape(-1)
if __name__ == "__main__":
time01 = timeit(
"approach01()",
number=10000,
setup="import numpy as np; from __main__ import approach01",
)
time02 = timeit(
"approach02()",
number=10000,
setup="import numpy as np; from __main__ import approach02",
)
print(f"Approach 1 (List Comprehension): {time01}")
print(f"Approach 2 (Vectorized): {time02}")
输出:
Approach 1 (List Comprehension): 8.1180582
Approach 2 (Vectorized): 0.9656997
我不会给出任何代码,但我已经大规模处理过类似的问题。我怀疑要使用这些方法中的任何一种获得不错的性能,您需要在 C 中实现核心(您可能会逃避使用 numba)。
如果您的两个数组都很大,那么有几种方法可以奏效。
这些主要归结为构建一个结构,该结构可用于从一个数组中找到一个点的最近邻居,然后在另一个数据集中查询它以获取每个点。
为此,我之前使用了 Kd 树方法和基于网格的方法。
基于网格的方法的基础是
- 找到第一个数组的 3D 范围。
- 将这个区域分成 LNM 个 bin。
- 对于第二个数组中的每个输入点,找到它的 bin。匹配它的任何点都将在该箱中。
您需要处理的极端情况是
- 如果该点落在一个 bin 的边缘,或者离 bin 的边界足够近以至于被认为等于它的点可能落在另一个 bin 中 - 那么您需要搜索多个 bin 以查找其“等于”。
- 如果该点落在所有 bin 之外,但靠近边缘,则“等于”它的点可能落在附近的 bin 中。
缺点是这对非均匀分布的数据不利。
优点是比较简单。统一数据的预期 运行 时间为 n1 * n2 / (L*N*M)
(与 n1*n2 相比)。通常你 select L,N,M 这样就变成了 O(n log(n))
。您还可以通过对第二个数组进行排序来进一步提升 bin 的重用性。并行化也相对容易(合并和搜索)
K-d 树方法类似。 IIRC 它给出了 O(n log(n))
行为,但实现起来比较棘手,数据结构的构建也很难并行化)。它往往不是缓存友好的,这可能意味着虽然它的渐近 运行-time 比基于网格的方法更好,但它在实践中可能 运行s 更慢。但是它确实为非均匀分布的数据提供了更好的保证。
正如@Michael Anderson 已经提到的,这可以使用 kd-tree 来实现。与您的回答相比,此解决方案使用的是绝对错误。这是否可以接受取决于问题。
例子
import numpy as np
from scipy import spatial
def find_nearest(big_array,small_array,tolerance):
tree_big=spatial.cKDTree(big_array)
tree_small=spatial.cKDTree(small_array)
return tree_small.query_ball_tree(tree_big,r=tolerance)
计时
big_array=np.random.rand(100_000,3)
small_array=np.random.rand(1_000,3)
big_array[1000:2000]=small_array
%timeit find_nearest(big_array,small_array,1e-9) #find all pairs within a distance of 1e-9
#55.7 ms ± 830 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
#A. Hendry
%timeit np.argwhere(np.isclose(small_array, big_array[:, None, :]).all(-1).any(-1)).reshape(-1)
#3.24 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
系统
OS: Windows 10 (x64),内部版本 1909
Python版本:3.8.10
Numpy 版本:1.21.2
问题
给定两个 2D (N, 3)
Numpy 数组 (x, y, z)
浮点数据点,Pythonic(矢量化)方法是什么来找到一个数组中点的索引等于另一个数组中的点?
(注意: 我的问题不同之处在于我需要这个来处理真实世界的数据集,其中两个数据集可能因浮点错误而不同. 详情请往下看)
历史
非常相似的问题已经被问过很多次了:
- test for membership in a 2d numpy array
- Get indices of intersecting rows of Numpy 2d Array
- Find indices of rows of numpy 2d array in another 2D array
- Indices of intersecting rows of Numpy 2d Array
之前的尝试
代码
import numpy as np
if __name__ == "__main__":
big_array = np.array(
[
[1.0, 2.0, 1.2],
[5.0, 3.0, 0.12],
[-1.0, 14.0, 0.0],
[-9.0, 0.0, 13.0],
]
)
small_array = np.array(
[
[5.0, 3.0, 0.12],
[-9.0, 0.0, 13.0],
]
)
inds = [
ndx
for ndx, barr in enumerate(big_array)
for sarr in small_array
if all(sarr == barr)
]
print(inds)
输出
[1, 2]
尝试 SO Post 3 (similar to SO Post 2) 的解决方案,但使用浮点数不起作用(我怀疑需要使用 np.isclose
的东西):
代码3:
import numpy as np
if __name__ == "__main__":
big_array = np.array(
[
[1.0, 2.0, 1.2],
[5.0, 3.0, 0.12],
[-1.0, 14.0, 0.0],
[-9.0, 0.0, 13.0],
],
dtype=float,
)
small_array = np.array(
[
[5.0, 3.0, 0.12],
[-9.0, 0.0, 13.0],
],
dtype=float,
)
inds = np.nonzero(
np.in1d(big_array.view("f,f").reshape(-1), small_array.view("f,f").reshape(-1))
)[0]
print(inds)
输出3:
[ 3 4 5 8 9 10 11]
我的尝试
我已经尝试 numpy.isin
与 np.all
和 np.argwhere
inds = np.argwhere(np.all(np.isin(big_array, small_array), axis=1)).reshape(-1)
有效(并且,我认为,更具可读性和可理解性;即 pythonic),但不适用于包含浮点错误的真实数据集:
import numpy as np
if __name__ == "__main__":
big_array = np.array(
[
[1.0, 2.0, 1.2],
[5.0, 3.0, 0.12],
[-1.0, 14.0, 0.0],
[-9.0, 0.0, 13.0],
],
dtype=float,
)
small_array = np.array(
[
[5.0, 3.0, 0.12],
[-9.0, 0.0, 13.0],
],
dtype=float,
)
small_array_fpe = np.array(
[
[5.0 + 1e-9, 3.0 + 1e-9, 0.12 + 1e-9],
[-9.0 + 1e-9, 0.0 + 1e-9, 13.0 + 1e-9],
],
dtype=float,
)
inds_no_fpe = np.argwhere(np.all(np.isin(big_array, small_array), axis=1)).reshape(-1)
inds_with_fpe = np.argwhere(
np.all(np.isin(big_array, small_array_fpe), axis=1)
).reshape(-1)
print(f"No Floating Point Error: {inds_no_fpe}")
print(f"With Floating Point Error: {inds_with_fpe}")
print(f"Are 5.0 and 5.0+1e-9 close?: {np.isclose(5.0, 5.0 + 1e-9)}")
输出:
No Floating Point Error: [1 3]
With Floating Point Error: []
Are 5.0 and 5.0+1e-9 close?: True
如何通过合并 np.isclose
使我的上述解决方案(在有浮点错误的数据集上)起作用?欢迎使用其他解决方案。
注意: 因为 small_array
是 big_array
的子集,直接使用 np.isclose
是行不通的,因为形状不会广播:
np.isclose(big_array, small_array_fpe)
产量
ValueError: operands could not be broadcast together with shapes (4,3) (2,3)
更新
目前,我唯一可行的解决方案是
inds_with_fpe = [
ndx
for ndx, barr in enumerate(big_array)
for sarr in small_array_fpe
if np.all(np.isclose(sarr, barr))
]
感谢@AndrasDeak 的回答
以下代码片段
inds_with_fpe = np.argwhere(
np.isclose(small_array_fpe, big_array[:, None, :]).all(-1).any(-1)
).reshape(-1)
将使代码工作。现在对应的输出是:
No Floating Point Error: [1 3]
With Floating Point Error: [1, 3]
Are 5.0 and 5.0+1e-9 close?: True
上面的 None
创建了一个新轴(与 np.newaxis
相同)。这会将 big_array
数组的形状更改为 (4, 1, 3)
,它遵守广播规则并允许 np.isclose
到 运行。也就是说,big_array
现在是一组4
1 x 3
点,并且由于big_array
中的一个轴是1,所以small_array_fpe
可以广播到2
1 x 3
数组(即形状 (2, 1, 3)
)和元素可以按元素进行比较。
结果是一个(4, 2, 3)
布尔数组; big_array
的每个元素都与 small_array_fpe
的每个元素进行元素比较,并且它们接近(在特定公差范围内)的组件是 returned。由于 all
是作为对象方法而不是 numpy 函数调用的,因此函数的第一个参数实际上是 axis
而不是输入数组。因此,上述函数中的 -1
表示“数组的最后一个轴”。
我们首先 return (4, 2, 3)
数组的索引都是 True
(即所有 (x, y, z)
分量都相等),这会产生 4 x 2
大批。其中任何一个 True
是 big_array
中点相等的相应索引,产生一个 4 x 1
数组。
argwhere
returns 索引按元素分组,因此其形状通常为 (number nonzero items, num dims of input array)
,因此我们将其展平为 1d
数组 reshape(-1)
.
不幸的是,这需要二次方的内存量 w.r.t。每个数组中的点数,因为我们必须 运行 遍历 big_array
的每个元素并对照 small_array_fpe
的每个元素检查它。比如在一组另外10000个点中搜索10000个点,对于32位浮点数据,需要
Memory = 10000 * 10000 * 4 * 8 = 32 GiB RAM!
如果有人能设计出一个具有更快 运行 时间和合理内存量的解决方案,那就太棒了!
仅供参考:
from timeit import timeit
import numpy as np
big_array = np.array(
[
[1.0, 2.0, 1.2],
[5.0, 3.0, 0.12],
[-1.0, 14.0, 0.0],
[-9.0, 0.0, 13.0],
],
dtype=float,
)
small_array = np.array(
[
[5.0 + 1e-9, 3.0 + 1e-9, 0.12 + 1e-9],
[10.0, 2.0, 5.8],
[-9.0 + 1e-9, 0.0 + 1e-9, 13.0 + 1e-9],
],
dtype=float,
)
def approach01():
return [
ndx
for ndx, barr in enumerate(big_array)
for sarr in small_array
if np.all(np.isclose(sarr, barr))
]
def approach02():
return np.argwhere(
np.isclose(small_array, big_array[:, None, :]).all(-1).any(-1)
).reshape(-1)
if __name__ == "__main__":
time01 = timeit(
"approach01()",
number=10000,
setup="import numpy as np; from __main__ import approach01",
)
time02 = timeit(
"approach02()",
number=10000,
setup="import numpy as np; from __main__ import approach02",
)
print(f"Approach 1 (List Comprehension): {time01}")
print(f"Approach 2 (Vectorized): {time02}")
输出:
Approach 1 (List Comprehension): 8.1180582
Approach 2 (Vectorized): 0.9656997
我不会给出任何代码,但我已经大规模处理过类似的问题。我怀疑要使用这些方法中的任何一种获得不错的性能,您需要在 C 中实现核心(您可能会逃避使用 numba)。
如果您的两个数组都很大,那么有几种方法可以奏效。 这些主要归结为构建一个结构,该结构可用于从一个数组中找到一个点的最近邻居,然后在另一个数据集中查询它以获取每个点。
为此,我之前使用了 Kd 树方法和基于网格的方法。
基于网格的方法的基础是
- 找到第一个数组的 3D 范围。
- 将这个区域分成 LNM 个 bin。
- 对于第二个数组中的每个输入点,找到它的 bin。匹配它的任何点都将在该箱中。
您需要处理的极端情况是
- 如果该点落在一个 bin 的边缘,或者离 bin 的边界足够近以至于被认为等于它的点可能落在另一个 bin 中 - 那么您需要搜索多个 bin 以查找其“等于”。
- 如果该点落在所有 bin 之外,但靠近边缘,则“等于”它的点可能落在附近的 bin 中。
缺点是这对非均匀分布的数据不利。
优点是比较简单。统一数据的预期 运行 时间为 n1 * n2 / (L*N*M)
(与 n1*n2 相比)。通常你 select L,N,M 这样就变成了 O(n log(n))
。您还可以通过对第二个数组进行排序来进一步提升 bin 的重用性。并行化也相对容易(合并和搜索)
K-d 树方法类似。 IIRC 它给出了 O(n log(n))
行为,但实现起来比较棘手,数据结构的构建也很难并行化)。它往往不是缓存友好的,这可能意味着虽然它的渐近 运行-time 比基于网格的方法更好,但它在实践中可能 运行s 更慢。但是它确实为非均匀分布的数据提供了更好的保证。
正如@Michael Anderson 已经提到的,这可以使用 kd-tree 来实现。与您的回答相比,此解决方案使用的是绝对错误。这是否可以接受取决于问题。
例子
import numpy as np
from scipy import spatial
def find_nearest(big_array,small_array,tolerance):
tree_big=spatial.cKDTree(big_array)
tree_small=spatial.cKDTree(small_array)
return tree_small.query_ball_tree(tree_big,r=tolerance)
计时
big_array=np.random.rand(100_000,3)
small_array=np.random.rand(1_000,3)
big_array[1000:2000]=small_array
%timeit find_nearest(big_array,small_array,1e-9) #find all pairs within a distance of 1e-9
#55.7 ms ± 830 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
#A. Hendry
%timeit np.argwhere(np.isclose(small_array, big_array[:, None, :]).all(-1).any(-1)).reshape(-1)
#3.24 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)