Numpy 查找二维数组中公共点的索引

Numpy Find Indices of Common Points in 2D Array

系统

OS: Windows 10 (x64),内部版本 1909
Python版本:3.8.10
Numpy 版本:1.21.2

问题

给定两个 2D (N, 3) Numpy 数组 (x, y, z) 浮点数据点,Pythonic(矢量化)方法是什么来找到一个数组中点的索引等于另一个数组中的点?

(注意: 我的问题不同之处在于我需要这个来处理真实世界的数据集,其中两个数据集可能因浮点错误而不同. 详情请往下看)

历史

非常相似的问题已经被问过很多次了:

  1. test for membership in a 2d numpy array
  2. Get indices of intersecting rows of Numpy 2d Array
  3. Find indices of rows of numpy 2d array in another 2D array
  4. Indices of intersecting rows of Numpy 2d Array

之前的尝试

提供了一个工作列表理解解决方案,但我正在寻找一个可以很好地扩展到大型数据集(即数百万个点)的解决方案:

代码:

import numpy as np

if __name__ == "__main__":
    big_array = np.array(
        [
            [1.0, 2.0, 1.2],
            [5.0, 3.0, 0.12],
            [-1.0, 14.0, 0.0],
            [-9.0, 0.0, 13.0],
        ]
    )

    small_array = np.array(
        [
            [5.0, 3.0, 0.12],
            [-9.0, 0.0, 13.0],
        ]
    )

    inds = [
        ndx
        for ndx, barr in enumerate(big_array)
        for sarr in small_array
        if all(sarr == barr)
    ]

    print(inds)

输出:

[1, 2]

尝试 SO Post 3 (similar to SO Post 2) 的解决方案,但使用浮点数不起作用(我怀疑需要使用 np.isclose 的东西):

代码3:

import numpy as np

if __name__ == "__main__":
    big_array = np.array(
        [
            [1.0, 2.0, 1.2],
            [5.0, 3.0, 0.12],
            [-1.0, 14.0, 0.0],
            [-9.0, 0.0, 13.0],
        ],
        dtype=float,
    )

    small_array = np.array(
        [
            [5.0, 3.0, 0.12],
            [-9.0, 0.0, 13.0],
        ],
        dtype=float,
    )

    inds = np.nonzero(
        np.in1d(big_array.view("f,f").reshape(-1), small_array.view("f,f").reshape(-1))
    )[0]

    print(inds)

输出3:

[ 3  4  5  8  9 10 11]

我的尝试

我已经尝试 numpy.isinnp.allnp.argwhere

inds = np.argwhere(np.all(np.isin(big_array, small_array), axis=1)).reshape(-1)

有效(并且,我认为,更具可读性和可理解性;即 pythonic),但不适用于包含浮点错误的真实数据集:

import numpy as np

if __name__ == "__main__":
    big_array = np.array(
        [
            [1.0, 2.0, 1.2],
            [5.0, 3.0, 0.12],
            [-1.0, 14.0, 0.0],
            [-9.0, 0.0, 13.0],
        ],
        dtype=float,
    )

    small_array = np.array(
        [
            [5.0, 3.0, 0.12],
            [-9.0, 0.0, 13.0],
        ],
        dtype=float,
    )

    small_array_fpe = np.array(
        [
            [5.0 + 1e-9, 3.0 + 1e-9, 0.12 + 1e-9],
            [-9.0 + 1e-9, 0.0 + 1e-9, 13.0 + 1e-9],
        ],
        dtype=float,
    )

    inds_no_fpe = np.argwhere(np.all(np.isin(big_array, small_array), axis=1)).reshape(-1)

    inds_with_fpe = np.argwhere(
        np.all(np.isin(big_array, small_array_fpe), axis=1)
    ).reshape(-1)

    print(f"No Floating Point Error: {inds_no_fpe}")
    print(f"With Floating Point Error: {inds_with_fpe}")

    print(f"Are 5.0 and 5.0+1e-9 close?: {np.isclose(5.0, 5.0 + 1e-9)}")

输出:

No Floating Point Error: [1 3]
With Floating Point Error: []
Are 5.0 and 5.0+1e-9 close?: True

如何通过合并 np.isclose 使我的上述解决方案(在有浮点错误的数据集上)起作用?欢迎使用其他解决方案。

注意: 因为 small_arraybig_array 的子集,直接使用 np.isclose 是行不通的,因为形状不会广播:

np.isclose(big_array, small_array_fpe)

产量

ValueError: operands could not be broadcast together with shapes (4,3) (2,3)

更新

目前,我唯一可行的解​​决方案是

inds_with_fpe = [
    ndx
    for ndx, barr in enumerate(big_array)
    for sarr in small_array_fpe
    if np.all(np.isclose(sarr, barr))
]

感谢@AndrasDeak 的回答

以下代码片段

inds_with_fpe = np.argwhere(
    np.isclose(small_array_fpe, big_array[:, None, :]).all(-1).any(-1)
).reshape(-1)

将使代码工作。现在对应的输出是:

No Floating Point Error: [1 3]
With Floating Point Error: [1, 3]
Are 5.0 and 5.0+1e-9 close?: True
上面的

None 创建了一个新轴(与 np.newaxis 相同)。这会将 big_array 数组的形状更改为 (4, 1, 3),它遵守广播规则并允许 np.isclose 到 运行。也就是说,big_array现在是一组4 1 x 3点,并且由于big_array中的一个轴是1,所以small_array_fpe可以广播到2 1 x 3 数组(即形状 (2, 1, 3))和元素可以按元素进行比较。

结果是一个(4, 2, 3)布尔数组; big_array 的每个元素都与 small_array_fpe 的每个元素进行元素比较,并且它们接近(在特定公差范围内)的组件是 returned。由于 all 是作为对象方法而不是 numpy 函数调用的,因此函数的第一个参数实际上是 axis 而不是输入数组。因此,上述函数中的 -1 表示“数组的最后一个轴”。

我们首先 return (4, 2, 3) 数组的索引都是 True(即所有 (x, y, z) 分量都相等),这会产生 4 x 2 大批。其中任何一个 Truebig_array 中点相等的相应索引,产生一个 4 x 1 数组。

argwhere returns 索引按元素分组,因此其形状通常为 (number nonzero items, num dims of input array),因此我们将其展平为 1d 数组 reshape(-1) .

不幸的是,这需要二次方的内存量 w.r.t。每个数组中的点数,因为我们必须 运行 遍历 big_array 的每个元素并对照 small_array_fpe 的每个元素检查它。比如在一组另外10000个点中搜索10000个点,对于32位浮点数据,需要

Memory = 10000 * 10000 * 4 * 8 = 32 GiB RAM!

如果有人能设计出一个具有更快 运行 时间和合理内存量的解决方案,那就太棒了!

仅供参考:

from timeit import timeit

import numpy as np

big_array = np.array(
    [
        [1.0, 2.0, 1.2],
        [5.0, 3.0, 0.12],
        [-1.0, 14.0, 0.0],
        [-9.0, 0.0, 13.0],
    ],
    dtype=float,
)

small_array = np.array(
    [
        [5.0 + 1e-9, 3.0 + 1e-9, 0.12 + 1e-9],
        [10.0, 2.0, 5.8],
        [-9.0 + 1e-9, 0.0 + 1e-9, 13.0 + 1e-9],
    ],
    dtype=float,
)


def approach01():
    return [
        ndx
        for ndx, barr in enumerate(big_array)
        for sarr in small_array
        if np.all(np.isclose(sarr, barr))
    ]


def approach02():
    return np.argwhere(
        np.isclose(small_array, big_array[:, None, :]).all(-1).any(-1)
    ).reshape(-1)


if __name__ == "__main__":
    time01 = timeit(
        "approach01()",
        number=10000,
        setup="import numpy as np; from __main__ import approach01",
    )
    time02 = timeit(
        "approach02()",
        number=10000,
        setup="import numpy as np; from __main__ import approach02",
    )

    print(f"Approach 1 (List Comprehension): {time01}")
    print(f"Approach 2 (Vectorized): {time02}")

输出:

Approach 1 (List Comprehension): 8.1180582
Approach 2 (Vectorized): 0.9656997

我不会给出任何代码,但我已经大规模处理过类似的问题。我怀疑要使用这些方法中的任何一种获得不错的性能,您需要在 C 中实现核心(您可能会逃避使用 numba)。

如果您的两个数组都很大,那么有几种方法可以奏效。 这些主要归结为构建一个结构,该结构可用于从一个数组中找到一个点的最近邻居,然后在另一个数据集中查询它以获取每个点。

为此,我之前使用了 Kd 树方法和基于网格的方法。

基于网格的方法的基础是

  • 找到第一个数组的 3D 范围。
  • 将这个区域分成 LNM 个 bin。
  • 对于第二个数组中的每个输入点,找到它的 bin。匹配它的任何点都将在该箱中。

您需要处理的极端情况是

  • 如果该点落在一个 bin 的边缘,或者离 bin 的边界足够近以至于被认为等于它的点可能落在另一个 bin 中 - 那么您需要搜索多个 bin 以查找其“等于”。
  • 如果该点落在所有 bin 之外,但靠近边缘,则“等于”它的点可能落在附近的 bin 中。

缺点是这对非均匀分布的数据不利。

优点是比较简单。统一数据的预期 运行 时间为 n1 * n2 / (L*N*M)(与 n1*n2 相比)。通常你 select L,N,M 这样就变成了 O(n log(n))。您还可以通过对第二个数组进行排序来进一步提升 bin 的重用性。并行化也相对容易(合并和搜索)

K-d 树方法类似。 IIRC 它给出了 O(n log(n)) 行为,但实现起来比较棘手,数据结构的构建也很难并行化)。它往往不是缓存友好的,这可能意味着虽然它的渐近 运行-time 比基于网格的方法更好,但它在实践中可能 运行s 更慢。但是它确实为非均匀分布的数据提供了更好的保证。

正如@Michael Anderson 已经提到的,这可以使用 kd-tree 来实现。与您的回答相比,此解决方案使用的是绝对错误。这是否可以接受取决于问题。

例子

import numpy as np
from scipy import spatial

def find_nearest(big_array,small_array,tolerance):
    tree_big=spatial.cKDTree(big_array)
    tree_small=spatial.cKDTree(small_array)
    return tree_small.query_ball_tree(tree_big,r=tolerance)

计时

big_array=np.random.rand(100_000,3)
small_array=np.random.rand(1_000,3)
big_array[1000:2000]=small_array

%timeit find_nearest(big_array,small_array,1e-9) #find all pairs within a distance of 1e-9
#55.7 ms ± 830 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

#A. Hendry
%timeit np.argwhere(np.isclose(small_array, big_array[:, None, :]).all(-1).any(-1)).reshape(-1)
#3.24 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)