根据另一个参考数组从一个数组中选择接近的匹配项
Selecting close matches from one array based on another reference array
我有一个数组 A
和一个引用数组 B
。 A
的大小至少与 B
一样大。例如
A = [2,100,300,793,1300,1500,1810,2400]
B = [4,305,789,1234,1890]
B
实际上是信号在指定时间的峰值位置,而A
包含稍后时间的峰值位置。但是A
中的一些元素其实不是我想要的峰(可能是噪声等),我想根据[=找到A
中的'real'一个12=]。 A
中的'real'个元素应该和B
中的元素接近,在上面给出的例子中,A
中的'real'个应该是[=23] =].在这个例子中应该很明显 100,1500,2400
不是我们想要的,因为它们与 B 中的任何元素都相去甚远。我如何在 [= 中以最 efficient/accurate 的方式对此进行编码33=]?
您可以使用 bsxfun
找到 A
中每个点与 B
中每个值的距离,然后找到 A
中点的索引,即使用 min
.
最接近 B
中的每个值
[dists, ind] = min(abs(bsxfun(@minus, A, B.')), [], 2)
如果您使用的是 R2016b,bsxfun
可以通过自动广播移除
[dists, ind] = min(abs(A - B.'), [], 2);
如果您怀疑 B
中的某些值不是真正的峰值,那么您可以设置一个阈值并删除任何大于该值的距离。
threshold = 90;
ind = ind(dists < threshold);
然后我们可以用ind
索引到A
output = A(ind);
方法 #1: 使用 NumPy broadcasting
,我们可以在输入数组之间寻找绝对元素减法,并使用适当的阈值从中过滤掉不需要的元素A
。对于给定的样本输入,90
的阈值似乎有效。
因此,我们将有一个实现,就像这样 -
thresh = 90
Aout = A[(np.abs(A[:,None] - B) < thresh).any(1)]
样本运行-
In [69]: A
Out[69]: array([ 2, 100, 300, 793, 1300, 1500, 1810, 2400])
In [70]: B
Out[70]: array([ 4, 305, 789, 1234, 1890])
In [71]: A[(np.abs(A[:,None] - B) < 90).any(1)]
Out[71]: array([ 2, 300, 793, 1300, 1810])
方法 #2: 基于 , here's a memory efficient approach using np.searchsorted
,这对于大型数组可能至关重要 -
def searchsorted_filter(a, b, thresh):
choices = np.sort(b) # if b is already sorted, skip it
lidx = np.searchsorted(choices, a, 'left').clip(max=choices.size-1)
ridx = (np.searchsorted(choices, a, 'right')-1).clip(min=0)
cl = np.take(choices,lidx) # Or choices[lidx]
cr = np.take(choices,ridx) # Or choices[ridx]
return a[np.minimum(np.abs(a - cl), np.abs(a - cr)) < thresh]
样本运行-
In [95]: searchsorted_filter(A,B, thresh = 90)
Out[95]: array([ 2, 300, 793, 1300, 1810])
运行时测试
In [104]: A = np.sort(np.random.randint(0,100000,(1000)))
In [105]: B = np.sort(np.random.randint(0,100000,(400)))
In [106]: out1 = A[(np.abs(A[:,None] - B) < 10).any(1)]
In [107]: out2 = searchsorted_filter(A,B, thresh = 10)
In [108]: np.allclose(out1, out2) # Verify results
Out[108]: True
In [109]: %timeit A[(np.abs(A[:,None] - B) < 10).any(1)]
100 loops, best of 3: 2.74 ms per loop
In [110]: %timeit searchsorted_filter(A,B, thresh = 10)
10000 loops, best of 3: 85.3 µs per loop
2018 年 1 月更新,性能进一步提升
我们可以通过使用从 np.searchsorted(..., 'left')
获得的索引以及 absolute
计算来避免 np.searchsorted(..., 'right')
的第二次使用,就像这样 -
def searchsorted_filter_v2(a, b, thresh):
N = len(b)
choices = np.sort(b) # if b is already sorted, skip it
l = np.searchsorted(choices, a, 'left')
l_invalid_mask = l==N
l[l_invalid_mask] = N-1
left_offset = choices[l]-a
left_offset[l_invalid_mask] *= -1
r = (l - (left_offset!=0))
r_invalid_mask = r<0
r[r_invalid_mask] = 0
r += l_invalid_mask
right_offset = a-choices[r]
right_offset[r_invalid_mask] *= -1
out = a[(left_offset < thresh) | (right_offset < thresh)]
return out
更新时间以测试进一步的加速 -
In [388]: np.random.seed(0)
...: A = np.random.randint(0,1000000,(100000))
...: B = np.unique(np.random.randint(0,1000000,(40000)))
...: np.random.shuffle(B)
...: thresh = 10
...:
...: out1 = searchsorted_filter(A, B, thresh)
...: out2 = searchsorted_filter_v2(A, B, thresh)
...: print np.allclose(out1, out2)
True
In [389]: %timeit searchsorted_filter(A, B, thresh)
10 loops, best of 3: 24.2 ms per loop
In [390]: %timeit searchsorted_filter_v2(A, B, thresh)
100 loops, best of 3: 13.9 ms per loop
深入挖掘 -
In [396]: a = A; b = B
In [397]: N = len(b)
...:
...: choices = np.sort(b) # if b is already sorted, skip it
...:
...: l = np.searchsorted(choices, a, 'left')
In [398]: %timeit np.sort(B)
100 loops, best of 3: 2 ms per loop
In [399]: %timeit np.searchsorted(choices, a, 'left')
100 loops, best of 3: 10.3 ms per loop
似乎 searchsorted
和 sort
占用了几乎所有的 运行 时间,它们似乎对这种方法至关重要。因此,似乎无法进一步改进这种基于排序的方法。
您可以使用完全符合您要求的 MATLAB interp1 函数。
选项nearest
用于查找最近点,无需指定阈值。
out = interp1(A, A, B, 'nearest', 'extrap');
与其他方法比较:
A = sort(randi([0,1000000],1,10000));
B = sort(randi([0,1000000],1,4000));
disp('---interp1----------------')
tic
out = interp1(A, A, B, 'nearest', 'extrap');
toc
disp('---subtraction with threshold------')
%numpy version is the same
tic
[dists, ind] = min(abs(bsxfun(@minus, A, B.')), [], 2);
toc
结果:
---interp1----------------
Elapsed time is 0.00778699 seconds.
---subtraction with threshold------
Elapsed time is 0.445485 seconds.
interp1
可用于大于 10000 和 4000 的输入,但在 subtrction
方法中发生内存不足错误。
我有一个数组 A
和一个引用数组 B
。 A
的大小至少与 B
一样大。例如
A = [2,100,300,793,1300,1500,1810,2400]
B = [4,305,789,1234,1890]
B
实际上是信号在指定时间的峰值位置,而A
包含稍后时间的峰值位置。但是A
中的一些元素其实不是我想要的峰(可能是噪声等),我想根据[=找到A
中的'real'一个12=]。 A
中的'real'个元素应该和B
中的元素接近,在上面给出的例子中,A
中的'real'个应该是[=23] =].在这个例子中应该很明显 100,1500,2400
不是我们想要的,因为它们与 B 中的任何元素都相去甚远。我如何在 [= 中以最 efficient/accurate 的方式对此进行编码33=]?
您可以使用 bsxfun
找到 A
中每个点与 B
中每个值的距离,然后找到 A
中点的索引,即使用 min
.
B
中的每个值
[dists, ind] = min(abs(bsxfun(@minus, A, B.')), [], 2)
如果您使用的是 R2016b,bsxfun
可以通过自动广播移除
[dists, ind] = min(abs(A - B.'), [], 2);
如果您怀疑 B
中的某些值不是真正的峰值,那么您可以设置一个阈值并删除任何大于该值的距离。
threshold = 90;
ind = ind(dists < threshold);
然后我们可以用ind
索引到A
output = A(ind);
方法 #1: 使用 NumPy broadcasting
,我们可以在输入数组之间寻找绝对元素减法,并使用适当的阈值从中过滤掉不需要的元素A
。对于给定的样本输入,90
的阈值似乎有效。
因此,我们将有一个实现,就像这样 -
thresh = 90
Aout = A[(np.abs(A[:,None] - B) < thresh).any(1)]
样本运行-
In [69]: A
Out[69]: array([ 2, 100, 300, 793, 1300, 1500, 1810, 2400])
In [70]: B
Out[70]: array([ 4, 305, 789, 1234, 1890])
In [71]: A[(np.abs(A[:,None] - B) < 90).any(1)]
Out[71]: array([ 2, 300, 793, 1300, 1810])
方法 #2: 基于 np.searchsorted
,这对于大型数组可能至关重要 -
def searchsorted_filter(a, b, thresh):
choices = np.sort(b) # if b is already sorted, skip it
lidx = np.searchsorted(choices, a, 'left').clip(max=choices.size-1)
ridx = (np.searchsorted(choices, a, 'right')-1).clip(min=0)
cl = np.take(choices,lidx) # Or choices[lidx]
cr = np.take(choices,ridx) # Or choices[ridx]
return a[np.minimum(np.abs(a - cl), np.abs(a - cr)) < thresh]
样本运行-
In [95]: searchsorted_filter(A,B, thresh = 90)
Out[95]: array([ 2, 300, 793, 1300, 1810])
运行时测试
In [104]: A = np.sort(np.random.randint(0,100000,(1000)))
In [105]: B = np.sort(np.random.randint(0,100000,(400)))
In [106]: out1 = A[(np.abs(A[:,None] - B) < 10).any(1)]
In [107]: out2 = searchsorted_filter(A,B, thresh = 10)
In [108]: np.allclose(out1, out2) # Verify results
Out[108]: True
In [109]: %timeit A[(np.abs(A[:,None] - B) < 10).any(1)]
100 loops, best of 3: 2.74 ms per loop
In [110]: %timeit searchsorted_filter(A,B, thresh = 10)
10000 loops, best of 3: 85.3 µs per loop
2018 年 1 月更新,性能进一步提升
我们可以通过使用从 np.searchsorted(..., 'left')
获得的索引以及 absolute
计算来避免 np.searchsorted(..., 'right')
的第二次使用,就像这样 -
def searchsorted_filter_v2(a, b, thresh):
N = len(b)
choices = np.sort(b) # if b is already sorted, skip it
l = np.searchsorted(choices, a, 'left')
l_invalid_mask = l==N
l[l_invalid_mask] = N-1
left_offset = choices[l]-a
left_offset[l_invalid_mask] *= -1
r = (l - (left_offset!=0))
r_invalid_mask = r<0
r[r_invalid_mask] = 0
r += l_invalid_mask
right_offset = a-choices[r]
right_offset[r_invalid_mask] *= -1
out = a[(left_offset < thresh) | (right_offset < thresh)]
return out
更新时间以测试进一步的加速 -
In [388]: np.random.seed(0)
...: A = np.random.randint(0,1000000,(100000))
...: B = np.unique(np.random.randint(0,1000000,(40000)))
...: np.random.shuffle(B)
...: thresh = 10
...:
...: out1 = searchsorted_filter(A, B, thresh)
...: out2 = searchsorted_filter_v2(A, B, thresh)
...: print np.allclose(out1, out2)
True
In [389]: %timeit searchsorted_filter(A, B, thresh)
10 loops, best of 3: 24.2 ms per loop
In [390]: %timeit searchsorted_filter_v2(A, B, thresh)
100 loops, best of 3: 13.9 ms per loop
深入挖掘 -
In [396]: a = A; b = B
In [397]: N = len(b)
...:
...: choices = np.sort(b) # if b is already sorted, skip it
...:
...: l = np.searchsorted(choices, a, 'left')
In [398]: %timeit np.sort(B)
100 loops, best of 3: 2 ms per loop
In [399]: %timeit np.searchsorted(choices, a, 'left')
100 loops, best of 3: 10.3 ms per loop
似乎 searchsorted
和 sort
占用了几乎所有的 运行 时间,它们似乎对这种方法至关重要。因此,似乎无法进一步改进这种基于排序的方法。
您可以使用完全符合您要求的 MATLAB interp1 函数。
选项nearest
用于查找最近点,无需指定阈值。
out = interp1(A, A, B, 'nearest', 'extrap');
与其他方法比较:
A = sort(randi([0,1000000],1,10000));
B = sort(randi([0,1000000],1,4000));
disp('---interp1----------------')
tic
out = interp1(A, A, B, 'nearest', 'extrap');
toc
disp('---subtraction with threshold------')
%numpy version is the same
tic
[dists, ind] = min(abs(bsxfun(@minus, A, B.')), [], 2);
toc
结果:
---interp1----------------
Elapsed time is 0.00778699 seconds.
---subtraction with threshold------
Elapsed time is 0.445485 seconds.
interp1
可用于大于 10000 和 4000 的输入,但在 subtrction
方法中发生内存不足错误。