如何获取二维 Numpy 数组中的秩

How to obtain the Ranks in a 2D Numpy Array

我正在尝试获取二维数组中的等级,沿轴=1,没有重复的等级。

假设我有以下数组:

array([[4.32, 6.43, 4.32, 2.21],
       [0.65,  nan, 8.12, 6.43],
       [ nan, 4.32, 1.23, 1.23]])

对于 'hi-lo' 排名,我希望得到以下结果:

array([[ 2.,  1.,  3.,  4.],
       [ 3., nan,  1.,  2.],
       [nan,  1.,  2.,  3.]])

以及以下结果,对于 'lo-hi' 排名:

array([[ 2.,  4.,  3.,  1.],
       [ 1., nan,  3.,  2.],
       [nan,  3.,  1.,  2.]])

我一直在使用 scipy.stats.rankdata,但此解决方案非常耗时(对于大型阵列)。此外,我正在使用的代码(如下所示)依赖于 np.apply_along_axis,我知道它效率不高。我知道 scipy.stats.rankdata 接受轴参数,但它背后的代码恰好使用 np.apply_along_axis(参见 here)。

def f(array, order='hi-lo'):
    array = np.asarray(array)
    lo_hi_rank = np.apply_along_axis(rankdata, 1, array, 'ordinal')
    lo_hi_rank = lo_hi_rank.astype(float)
    lo_hi_rank[np.isnan(array)] = np.NaN
    if order == 'lo-hi':
        return lo_hi_rank
    else:
        return np.nanmax(lo_hi_rank, axis=1, keepdims=True) - lo_hi_rank + 1

有人知道更快的实现吗?


更新

我已经比较了目前建议的所有选项的执行时间。

下面的选项 1 是我上面建议的代码的显式循环版本(在下面作为选项 2 重复)

def option1(a, order='ascending'):
    ranks = np.empty_like(a)
    for row in range(ranks.shape[0]):
        lo_hi_rank = rankdata(a[row], method='ordinal')
        lo_hi_rank = lo_hi_rank.astype(float)
        lo_hi_rank[np.isnan(a[row])] = np.NaN
        if order == 'ascending':
            ranks[row] = lo_hi_rank.copy()
        else:
            ranks[row] = np.nanmax(lo_hi_rank) - lo_hi_rank + 1
    return ranks

def option2(a, order='ascending'):
    a = np.asarray(a)
    lo_hi_rank = np.apply_along_axis(rankdata, 1, a, 'ordinal')
    lo_hi_rank = lo_hi_rank.astype(float)
    lo_hi_rank[np.isnan(a)] = np.NaN
    if order == 'ascending':
        return lo_hi_rank
    else:
        return np.nanmax(lo_hi_rank, axis=1, keepdims=True) - lo_hi_rank + 1

选项 3-6 由 Divakar 建议:

def option3(a, order='ascending'):
    na = np.isnan(a)
    sm = na.sum(1,keepdims=True)
    
    if order=='descending':
        b = np.where(np.isnan(a), -np.inf, -a)
    else:
        b = np.where(np.isnan(a), -np.inf,a)
    
    out = b.argsort(1,'stable').argsort(1)+1. - sm
    out[out<=0] = np.nan
    return out

def option4(a, order='ascending'):
    na = np.isnan(a)
    sm = na.sum(1,keepdims=True)

    if order=='descending':
        b = np.where(np.isnan(a), -np.inf, -a)
    else:
        b = np.where(np.isnan(a), -np.inf,a)

    idx = b.argsort(1,'stable')
    m,n = idx.shape
    sidx = np.empty((m,n), dtype=float)
    np.put_along_axis(sidx, idx,np.arange(1,n+1), axis=1)
    
    out = sidx - sm
    out[out<=0] = np.nan
    return out

def option5(a, order='descending'):
    b = -a if order=='descending' else a        
    out = b.argsort(1,'stable').argsort(1)+1.
    return np.where(np.isnan(a), np.nan, out)

def option6(a, order='descending'):
    b = -a if order=='descending' else a        
    idx = b.argsort(1,'stable')
    m,n = idx.shape
    out = np.empty((m,n), dtype=float)
    np.put_along_axis(out, idx,np.arange(1,n+1), axis=1)
    return np.where(np.isnan(a), np.nan, out)

选项 6 似乎是最干净的,而且确实是最快的(与选项 2 相比提高了约 40%)。请参阅下面 100 次迭代的平均执行时间,其中 array.shape=(5348,1225)

>> TIME COMPARISON
>> 100 iterations | array.shape=(5348, 1225)

>> Option1: 0.4838 seconds
>> Option2: 0.3404 seconds
>> Option3: 0.3355 seconds
>> Option4: 0.2331 seconds
>> Option5: 0.3145 seconds
>> Option6: 0.2114 seconds

它也可以扩展到通用轴和通用 n-dim 数组,如 Divakar 所提出的。但是,对于我想要实现的目标来说,它仍然太耗时了(因为我必须 运行 这个函数在一个循环中数百万次)。有更快的选择吗?或者我们是否达到了 Python 的可行性?

方法一

这是一种方法 -

def rank_with_nans(a, order='descending'):
    na = np.isnan(a)
    sm = na.sum(1,keepdims=True)
    
    if order=='descending':
        b = np.where(np.isnan(a), -np.inf, -a)
    else:
        b = np.where(np.isnan(a), -np.inf,a)
    
    out = b.argsort(1,'stable').argsort(1)+1. - sm
    out[out<=0] = np.nan
    return out

我们可以基于 对双 argsort 部分进行优化,如下所示 -

def rank_with_nans_v2(a, order='descending'):
    na = np.isnan(a)
    sm = na.sum(1,keepdims=True)
    
    if order=='descending':
        b = np.where(np.isnan(a), -np.inf, -a)
    else:
        b = np.where(np.isnan(a), -np.inf,a)

    idx = b.argsort(1,'stable')
    m,n = idx.shape
    sidx = np.empty((m,n), dtype=float)
    np.put_along_axis(sidx, idx,np.arange(1,n+1), axis=1)
    
    out = sidx - sm
    out[out<=0] = np.nan
    return out

样本 运行s -

In [338]: a
Out[338]: 
array([[4.32, 6.43, 4.32, 2.21],
       [0.65,  nan, 8.12, 6.43],
       [ nan, 4.32, 1.23, 1.23]])

In [339]: rank_with_nans(a, order='descending')
Out[339]: 
array([[ 2.,  1.,  3.,  4.],
       [ 3., nan,  1.,  2.],
       [nan,  1.,  2.,  3.]])

In [340]: rank_with_nans(a, order='ascending')
Out[340]: 
array([[ 2.,  4.,  3.,  1.],
       [ 1., nan,  3.,  2.],
       [nan,  3.,  1.,  2.]])

方法#2

没有inf转换,这里有double-argsort-

def rank_with_nans_v3(a, order='descending'):
    b = -a if order=='descending' else a        
    out = b.argsort(1,'stable').argsort(1)+1.
    return np.where(np.isnan(a), np.nan, out)

再次使用 argsort-skip 技巧 -

def rank_with_nans_v4(a, order='descending'):
    b = -a if order=='descending' else a        
    idx = b.argsort(1,'stable')
    m,n = idx.shape
    out = np.empty((m,n), dtype=float)
    np.put_along_axis(out, idx,np.arange(1,n+1), axis=1)
    return np.where(np.isnan(a), np.nan, out)

奖励:扩展到通用轴和通用 n-dim 数组

我们可以扩展提议的解决方案以纳入 axis,以便可以沿该轴应用排名。最后一个解决方案 v4 似乎是最有效的解决方案。让我们用它来使它通用 -

def rank_with_nans_along_axis(a, order='descending', axis=-1):
    b = -a if order=='descending' else a
    idx = b.argsort(axis=axis, kind='stable')
    out = np.empty(idx.shape, dtype=float)
    indexer = tuple([None if i!=axis else Ellipsis for i in range(a.ndim)])
    np.put_along_axis(out, idx, np.arange(1,a.shape[axis]+1, dtype=float)[indexer], axis=axis)
    return np.where(np.isnan(a), np.nan, out)

样本运行-

In [227]: a
Out[227]: 
array([[4.32, 6.43, 4.32, 2.21],
       [0.65,  nan, 8.12, 6.43],
       [ nan, 4.32, 1.23, 1.23]])

In [228]: rank_with_nans_along_axis(a, order='descending',axis=0)
Out[228]: 
array([[ 1.,  1.,  2.,  2.],
       [ 2., nan,  1.,  1.],
       [nan,  2.,  3.,  3.]])

In [229]: rank_with_nans_along_axis(a, order='ascending',axis=0)
Out[229]: 
array([[ 2.,  2.,  2.,  2.],
       [ 1., nan,  3.,  3.],
       [nan,  1.,  1.,  1.]])