在 numpy 数组中获取两个相邻的非 nan 值

Get two neighboring non-nan values in numpy array

假设我有一个 numpy 数组

my_array = [0.2, 0.3, nan, nan, nan, 0.1, nan, 0.5, nan]

对于每个 nan 值,我想提取该点左侧和右侧的两个非 nan 值(如果合适,也可以提取单个值)。所以我希望我的输出类似于

output = [[0.3,0.1], [0.3,0.1], [0.3,0.1], [0.1,0.5], [0.5]]

我正在考虑遍历 my_array 中的所有值,然后找到那些是 nan 的值,但我不确定如何进行下一部分找到最近的非 nan 值。

使用 pandas 和 numpy:

s = pd.Series([0.2, 0.3, nan, nan, nan, 0.1, nan, 0.5, nan])
m = s.isna()
a = np.vstack((s.ffill()[m], s.bfill()[m]))
out = a[:,~np.isnan(a).any(0)].T.tolist()

输出:

[[0.3, 0.1], [0.3, 0.1], [0.3, 0.1], [0.1, 0.5]]

注意。您可以选择保留或删除包含 NaN 的列表。

使用 NaN:

out = a.T.tolist()

[[0.3, 0.1], [0.3, 0.1], [0.3, 0.1], [0.1, 0.5], [0.5, nan]]

处理单个元素的替代方法:

s = pd.Series([0.2, 0.3, nan, nan, nan, 0.1, nan, 0.5, nan])
m = s.isna()

(pd
 .concat((s.ffill()[m], s.bfill()[m]), axis=1)
 .stack()
 .groupby(level=0).agg(list)
 .to_list()
 )

输出:

[[0.3, 0.1], [0.3, 0.1], [0.3, 0.1], [0.1, 0.5], [0.5]]

不如@mozway 的回答优雅,但最后一个列表只有一个元素:

pd.DataFrame({
    'left':arr.ffill(), 
    'right': arr.bfill()
}).loc[arr.isna()].apply(lambda row: row.dropna().to_list(), axis=1).to_list()

为了教育,我将 post 一个漂亮的 straight-forward 算法来实现这个结果,它通过找到最接近的值的左侧和右侧的索引来工作NaN 的每个索引,并在末尾过滤掉任何 infs:

def get_neighbors(x: np.ndarray) -> list:
    mask = np.isnan(x)
    nan_idxs, *_ = np.where(mask)
    val_idxs, *_ = np.where(~mask)

    neighbors = []
    for nan_idx in nan_idxs:
        L, R = -float("inf"), float("inf")
        for val_idx in val_idxs:
            if val_idx < nan_idx:
                L = max(L, val_idx)
            else:
                R = min(R, val_idx)
        # casting to list isn't strictly necessary, you'll just end up with a list of arrays
        neighbors.append(list(x[[i for i in (L, R) if i > 0 and i < float("inf")]]))

    return neighbors

输出:

>>> get_neighbors(my_array)
[[0.3, 0.1], [0.3, 0.1], [0.3, 0.1], [0.1, 0.5], [0.5]]

嵌套的 for 循环的 worst-case 运行时间为 O((n / 2)^2),其中 nx 的元素数(最坏情况恰好发生在一半的元素是 NaN)。

我很想知道如何仅使用 NumPy 来解决这个问题作为练习。几个小时后我可以找到解决方案 :),但我认为与 Mozway 提到的 pandas 相比效率低下,我没有进一步优化代码(可以优化;如果条件可以治愈并合并到其他部分):

my_array = np.array([np.nan, np.nan, 0.2, 0.3, np.nan, np.nan, np.nan, 0.1, 0.7, np.nan, 0.5])

nans = np.isnan(my_array).astype(np.int8)           # [1 1 0 0 1 1 1 0 0 1 0]
zeros = np.where(nans == 0)[0]                      # [ 2  3  7  8 10]
diff_nan = np.diff(nans)                            # [ 0 -1  0  1  0  0 -1  0  1 -1]
start = np.where(diff_nan == 1)[0]                  # [3 8]
end = np.where(diff_nan == -1)[0] + 1               # [ 2  7 10]

mask_start_nan = np.isnan(my_array[0])              # True
mask_end_nan = np.isnan(my_array[-1])               # False

if mask_end_nan: start = start[:-1]                 # [3 8]
if mask_start_nan: end = end[1:]                    # [ 7 10]
inds = np.dstack([start, end]).squeeze()            # [[ 3  7] [ 8 10]]
initial = my_array[inds]                            # [[0.3 0.1] [0.7 0.5]]

repeats = np.diff(np.where(np.concatenate(([nans[0]], nans[:-1] != nans[1:], [True])))[0])[::2]    # [2 3 1]
if mask_end_nan: repeats = repeats[:-1]             # [2 3 1]
if mask_start_nan: repeats = repeats[1:]            # [3 1]

result = np.repeat(initial, repeats, axis=0)        # [[0.3 0.1] [0.3 0.1] [0.3 0.1] [0.7 0.5]]
if mask_end_nan: result = np.array([*result, np.array(my_array[zeros[-1]])], dtype=object)
if mask_start_nan: result = np.array([np.array(my_array[zeros[0]]), *result], dtype=object)

# [array(0.2) array([0.3, 0.1]) array([0.3, 0.1]) array([0.3, 0.1]) array([0.7, 0.5])]

我不知道 NumPy 是否有更简单的解决方案;我实现了我的想法。相信这段代码可以有很大的提升(有空我会做的)