提高性能(向量化?)pandas.groupby.aggregate

Improve performances (vectorize?) pandas.groupby.aggregate

我正在尝试使用自定义聚合函数提高 pandas.groupby.aggregate 操作的性能。我注意到 - 如果我错了请纠正我 - pandas 按顺序在每个块上调用聚合函数(我怀疑它是一个简单的 for 循环)。

由于 pandas 很大程度上基于 numpy,有没有办法使用 numpy 的矢量化功能来加速计算?

我的代码

在我的代码中,我需要将风数据平均样本汇总在一起。虽然平均风速是微不足道的,但平均风向需要更特别的代码(例如,1deg 和 359deg 的平均值是 0deg,而不是 180deg)。

我的聚合函数的作用是:

  1. 删除 NaN
  2. return 如果不存在其他值,则为 NaN
  3. 检查是否存在指示可变风向的特殊标志。如果是,return flag
  4. 平均风向 vector-averaging algorithm

函数为:

def meandir(x):
    '''
    Parameters
    ----------
    x : pandas.Series
        pandas series to be averaged

    Returns
    -------
    float
        averaged wind direction
    '''

    # Removes the NaN from the recording
    x = x.dropna()

    # If the record is empty, return NaN
    if len(x)==0:
        return np.nan

    # If the record contains variable samples (990) return variable (990)
    elif np.any(x == 990):
        return 990

    # Otherwise sum the vectors and return the angle
    else:
        angle = np.rad2deg(
                           np.arctan2(
                                   np.sum(np.sin(np.deg2rad(x))),
                                   np.sum(np.cos(np.deg2rad(x)))
                                     )
                          )

        #Wrap angles from (-pi,pi) to (0,360)
        return (angle + 360) % 360

你可以用

测试一下
from timeit import repeat
import pandas as pd
import numpy as np

N_samples = int(1e4)
N_nan = N_var = int(0.02 * N_samples)

# Generate random data
data = np.random.rand(N_samples,2) * [30, 360]
data[np.random.choice(N_samples, N_nan), 1] = np.nan
data[np.random.choice(N_samples, N_var), 1] = 990

# Create dataset
df = pd.DataFrame(data, columns=['WindSpeed', 'WindDir'])
df.index = pd.date_range(start='2000-01-01 00:00', periods=N_samples, freq='10min')

# Run groupby + aggregate
grouped = df.groupby(pd.Grouper(freq='H'))   # Data from 14.30 to 15.29 are rounded to 15.00
aggfuns1 = {'WindSpeed': np.mean, 'WindDir':meandir}
aggfuns2 = {'WindSpeed': np.mean, 'WindDir':np.mean}

res = repeat(stmt='grouped.agg(aggfuns1)', globals=globals(), number=1, repeat=10)
print(f'With custom aggregating function {min(res)*1000:.2f} ms')

res = repeat(stmt='grouped.agg(aggfuns2)', globals=globals(), number=1, repeat=10)
print(f'Without custom aggregating function {min(res)*1000:.2f} ms')

在我的电脑上 N_samples=1e4 输出:

With custom aggregating function 1500.79 ms
Without custom aggregating function 2.08 ms

自定义聚合函数慢了 750 倍 N_samples=1e6 输出:

With custom aggregating function 142967.17 ms
Without custom aggregating function 21.92 ms

自定义聚合函数慢了 6500 倍!

有没有办法加快这行代码的速度?

关键是尝试对整体上的所有内容进行矢量化 df,并让 groupby 仅使用内置方法。

这是一种方法。诀窍是将角度转换为复数,numpy 会很乐意求和 (还有 groupby,但是 groupby 会拒绝 mean())。因此,我们将角度转换为 complexsum,然后 转换回角度。在您的代码中使用了相同的角度“有趣平均值”,并在您引用的维基百科页面上进行了描述。

关于特殊值(990)的处理,它也可以向量化:比较s.groupby(...).count().replace(val, nan).groupby(...).count()找到至少有一个的所有组.

无论如何,这里是:

def to_complex(s):
    return np.exp(np.deg2rad(s) * 1j)

def to_angle(s):
    return np.angle(s, deg=True) % 360

def mask_val(s, grouper, val=990):
    return s.groupby(grouper).count() != s.replace(val, np.nan).groupby(grouper).count()

def myagg(df, grouper, val=990, winddir='WindDir'):
    s = df[winddir]
    mask = mask_val(s, grouper, val)
    gb = to_complex(s).groupby(grouper)
    s = gb.sum()
    cnt = gb.count()
    s = to_angle(s) * (cnt / cnt)  # put NaN where all NaNs
    s[mask] = val
    
    # other columns
    agg = df.groupby(grouper).mean()
    agg[winddir] = s

    return agg

申请:

为了方便起见,我把你的示例生成放到了一个函数中gen_example(N_samples)

df = gen_example(50)
myagg(df, pd.Grouper(freq='H'))

Out[ ]:
                     WindSpeed     WindDir
2000-01-01 00:00:00  12.991717  354.120464
2000-01-01 01:00:00  15.743056   60.813629
2000-01-01 02:00:00  14.593927  245.487383
2000-01-01 03:00:00  17.836368  131.493675
2000-01-01 04:00:00  18.987296   27.150359
2000-01-01 05:00:00  16.415725  194.923399
2000-01-01 06:00:00  20.881816  990.000000
2000-01-01 07:00:00  15.033480   44.626018
2000-01-01 08:00:00  16.276834   29.252459

速度:

df = gen_example(10_000)
%timeit myagg(df, pd.Grouper(freq='H'))

Out[ ]:
6.76 ms ± 12.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

df = gen_example(1e6)
%timeit myagg(df, pd.Grouper(freq='H'))

Out[ ]:
189 ms ± 425 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

测试:

idx = [0] * 4
grouper = pd.Grouper(level=0)

myagg(pd.DataFrame({'WindDir': [170, 170, 178, 182]}, index=idx), grouper)
      WindDir
0  174.998473

myagg(pd.DataFrame({'WindDir': [330, 359, 1, 40]}, index=idx), grouper)
    WindDir
0  2.251499

myagg(pd.DataFrame({'WindDir': [330, 359, 1, np.nan]}, index=idx), grouper)
      WindDir
0  350.102878

myagg(pd.DataFrame({'WindDir': [np.nan, np.nan, np.nan, np.nan]}, index=idx), grouper)
   WindDir
0      NaN

myagg(pd.DataFrame({'WindDir': [330, 990, 1, np.nan]}, index=idx), grouper)
   WindDir
0    990.0