提高性能(向量化?)pandas.groupby.aggregate
Improve performances (vectorize?) pandas.groupby.aggregate
我正在尝试使用自定义聚合函数提高 pandas.groupby.aggregate
操作的性能。我注意到 - 如果我错了请纠正我 - pandas
按顺序在每个块上调用聚合函数(我怀疑它是一个简单的 for
循环)。
由于 pandas
很大程度上基于 numpy
,有没有办法使用 numpy
的矢量化功能来加速计算?
我的代码
在我的代码中,我需要将风数据平均样本汇总在一起。虽然平均风速是微不足道的,但平均风向需要更特别的代码(例如,1deg 和 359deg 的平均值是 0deg,而不是 180deg)。
我的聚合函数的作用是:
- 删除 NaN
- return 如果不存在其他值,则为 NaN
- 检查是否存在指示可变风向的特殊标志。如果是,return flag
- 平均风向 vector-averaging algorithm
函数为:
def meandir(x):
'''
Parameters
----------
x : pandas.Series
pandas series to be averaged
Returns
-------
float
averaged wind direction
'''
# Removes the NaN from the recording
x = x.dropna()
# If the record is empty, return NaN
if len(x)==0:
return np.nan
# If the record contains variable samples (990) return variable (990)
elif np.any(x == 990):
return 990
# Otherwise sum the vectors and return the angle
else:
angle = np.rad2deg(
np.arctan2(
np.sum(np.sin(np.deg2rad(x))),
np.sum(np.cos(np.deg2rad(x)))
)
)
#Wrap angles from (-pi,pi) to (0,360)
return (angle + 360) % 360
你可以用
测试一下
from timeit import repeat
import pandas as pd
import numpy as np
N_samples = int(1e4)
N_nan = N_var = int(0.02 * N_samples)
# Generate random data
data = np.random.rand(N_samples,2) * [30, 360]
data[np.random.choice(N_samples, N_nan), 1] = np.nan
data[np.random.choice(N_samples, N_var), 1] = 990
# Create dataset
df = pd.DataFrame(data, columns=['WindSpeed', 'WindDir'])
df.index = pd.date_range(start='2000-01-01 00:00', periods=N_samples, freq='10min')
# Run groupby + aggregate
grouped = df.groupby(pd.Grouper(freq='H')) # Data from 14.30 to 15.29 are rounded to 15.00
aggfuns1 = {'WindSpeed': np.mean, 'WindDir':meandir}
aggfuns2 = {'WindSpeed': np.mean, 'WindDir':np.mean}
res = repeat(stmt='grouped.agg(aggfuns1)', globals=globals(), number=1, repeat=10)
print(f'With custom aggregating function {min(res)*1000:.2f} ms')
res = repeat(stmt='grouped.agg(aggfuns2)', globals=globals(), number=1, repeat=10)
print(f'Without custom aggregating function {min(res)*1000:.2f} ms')
在我的电脑上 N_samples=1e4
输出:
With custom aggregating function 1500.79 ms
Without custom aggregating function 2.08 ms
自定义聚合函数慢了 750 倍
N_samples=1e6
输出:
With custom aggregating function 142967.17 ms
Without custom aggregating function 21.92 ms
自定义聚合函数慢了 6500 倍!
有没有办法加快这行代码的速度?
关键是尝试对整体上的所有内容进行矢量化 df
,并让 groupby
仅使用内置方法。
这是一种方法。诀窍是将角度转换为复数,numpy 会很乐意求和
(还有 groupby
,但是 groupby
会拒绝 mean()
)。因此,我们将角度转换为 complex
、sum
,然后
转换回角度。在您的代码中使用了相同的角度“有趣平均值”,并在您引用的维基百科页面上进行了描述。
关于特殊值(990
)的处理,它也可以向量化:比较s.groupby(...).count()
和.replace(val, nan).groupby(...).count()
找到至少有一个的所有组.
无论如何,这里是:
def to_complex(s):
return np.exp(np.deg2rad(s) * 1j)
def to_angle(s):
return np.angle(s, deg=True) % 360
def mask_val(s, grouper, val=990):
return s.groupby(grouper).count() != s.replace(val, np.nan).groupby(grouper).count()
def myagg(df, grouper, val=990, winddir='WindDir'):
s = df[winddir]
mask = mask_val(s, grouper, val)
gb = to_complex(s).groupby(grouper)
s = gb.sum()
cnt = gb.count()
s = to_angle(s) * (cnt / cnt) # put NaN where all NaNs
s[mask] = val
# other columns
agg = df.groupby(grouper).mean()
agg[winddir] = s
return agg
申请:
为了方便起见,我把你的示例生成放到了一个函数中gen_example(N_samples)
。
df = gen_example(50)
myagg(df, pd.Grouper(freq='H'))
Out[ ]:
WindSpeed WindDir
2000-01-01 00:00:00 12.991717 354.120464
2000-01-01 01:00:00 15.743056 60.813629
2000-01-01 02:00:00 14.593927 245.487383
2000-01-01 03:00:00 17.836368 131.493675
2000-01-01 04:00:00 18.987296 27.150359
2000-01-01 05:00:00 16.415725 194.923399
2000-01-01 06:00:00 20.881816 990.000000
2000-01-01 07:00:00 15.033480 44.626018
2000-01-01 08:00:00 16.276834 29.252459
速度:
df = gen_example(10_000)
%timeit myagg(df, pd.Grouper(freq='H'))
Out[ ]:
6.76 ms ± 12.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
df = gen_example(1e6)
%timeit myagg(df, pd.Grouper(freq='H'))
Out[ ]:
189 ms ± 425 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
测试:
idx = [0] * 4
grouper = pd.Grouper(level=0)
myagg(pd.DataFrame({'WindDir': [170, 170, 178, 182]}, index=idx), grouper)
WindDir
0 174.998473
myagg(pd.DataFrame({'WindDir': [330, 359, 1, 40]}, index=idx), grouper)
WindDir
0 2.251499
myagg(pd.DataFrame({'WindDir': [330, 359, 1, np.nan]}, index=idx), grouper)
WindDir
0 350.102878
myagg(pd.DataFrame({'WindDir': [np.nan, np.nan, np.nan, np.nan]}, index=idx), grouper)
WindDir
0 NaN
myagg(pd.DataFrame({'WindDir': [330, 990, 1, np.nan]}, index=idx), grouper)
WindDir
0 990.0
我正在尝试使用自定义聚合函数提高 pandas.groupby.aggregate
操作的性能。我注意到 - 如果我错了请纠正我 - pandas
按顺序在每个块上调用聚合函数(我怀疑它是一个简单的 for
循环)。
由于 pandas
很大程度上基于 numpy
,有没有办法使用 numpy
的矢量化功能来加速计算?
我的代码
在我的代码中,我需要将风数据平均样本汇总在一起。虽然平均风速是微不足道的,但平均风向需要更特别的代码(例如,1deg 和 359deg 的平均值是 0deg,而不是 180deg)。
我的聚合函数的作用是:
- 删除 NaN
- return 如果不存在其他值,则为 NaN
- 检查是否存在指示可变风向的特殊标志。如果是,return flag
- 平均风向 vector-averaging algorithm
函数为:
def meandir(x):
'''
Parameters
----------
x : pandas.Series
pandas series to be averaged
Returns
-------
float
averaged wind direction
'''
# Removes the NaN from the recording
x = x.dropna()
# If the record is empty, return NaN
if len(x)==0:
return np.nan
# If the record contains variable samples (990) return variable (990)
elif np.any(x == 990):
return 990
# Otherwise sum the vectors and return the angle
else:
angle = np.rad2deg(
np.arctan2(
np.sum(np.sin(np.deg2rad(x))),
np.sum(np.cos(np.deg2rad(x)))
)
)
#Wrap angles from (-pi,pi) to (0,360)
return (angle + 360) % 360
你可以用
测试一下from timeit import repeat
import pandas as pd
import numpy as np
N_samples = int(1e4)
N_nan = N_var = int(0.02 * N_samples)
# Generate random data
data = np.random.rand(N_samples,2) * [30, 360]
data[np.random.choice(N_samples, N_nan), 1] = np.nan
data[np.random.choice(N_samples, N_var), 1] = 990
# Create dataset
df = pd.DataFrame(data, columns=['WindSpeed', 'WindDir'])
df.index = pd.date_range(start='2000-01-01 00:00', periods=N_samples, freq='10min')
# Run groupby + aggregate
grouped = df.groupby(pd.Grouper(freq='H')) # Data from 14.30 to 15.29 are rounded to 15.00
aggfuns1 = {'WindSpeed': np.mean, 'WindDir':meandir}
aggfuns2 = {'WindSpeed': np.mean, 'WindDir':np.mean}
res = repeat(stmt='grouped.agg(aggfuns1)', globals=globals(), number=1, repeat=10)
print(f'With custom aggregating function {min(res)*1000:.2f} ms')
res = repeat(stmt='grouped.agg(aggfuns2)', globals=globals(), number=1, repeat=10)
print(f'Without custom aggregating function {min(res)*1000:.2f} ms')
在我的电脑上 N_samples=1e4
输出:
With custom aggregating function 1500.79 ms
Without custom aggregating function 2.08 ms
自定义聚合函数慢了 750 倍
N_samples=1e6
输出:
With custom aggregating function 142967.17 ms
Without custom aggregating function 21.92 ms
自定义聚合函数慢了 6500 倍!
有没有办法加快这行代码的速度?
关键是尝试对整体上的所有内容进行矢量化 df
,并让 groupby
仅使用内置方法。
这是一种方法。诀窍是将角度转换为复数,numpy 会很乐意求和
(还有 groupby
,但是 groupby
会拒绝 mean()
)。因此,我们将角度转换为 complex
、sum
,然后
转换回角度。在您的代码中使用了相同的角度“有趣平均值”,并在您引用的维基百科页面上进行了描述。
关于特殊值(990
)的处理,它也可以向量化:比较s.groupby(...).count()
和.replace(val, nan).groupby(...).count()
找到至少有一个的所有组.
无论如何,这里是:
def to_complex(s):
return np.exp(np.deg2rad(s) * 1j)
def to_angle(s):
return np.angle(s, deg=True) % 360
def mask_val(s, grouper, val=990):
return s.groupby(grouper).count() != s.replace(val, np.nan).groupby(grouper).count()
def myagg(df, grouper, val=990, winddir='WindDir'):
s = df[winddir]
mask = mask_val(s, grouper, val)
gb = to_complex(s).groupby(grouper)
s = gb.sum()
cnt = gb.count()
s = to_angle(s) * (cnt / cnt) # put NaN where all NaNs
s[mask] = val
# other columns
agg = df.groupby(grouper).mean()
agg[winddir] = s
return agg
申请:
为了方便起见,我把你的示例生成放到了一个函数中gen_example(N_samples)
。
df = gen_example(50)
myagg(df, pd.Grouper(freq='H'))
Out[ ]:
WindSpeed WindDir
2000-01-01 00:00:00 12.991717 354.120464
2000-01-01 01:00:00 15.743056 60.813629
2000-01-01 02:00:00 14.593927 245.487383
2000-01-01 03:00:00 17.836368 131.493675
2000-01-01 04:00:00 18.987296 27.150359
2000-01-01 05:00:00 16.415725 194.923399
2000-01-01 06:00:00 20.881816 990.000000
2000-01-01 07:00:00 15.033480 44.626018
2000-01-01 08:00:00 16.276834 29.252459
速度:
df = gen_example(10_000)
%timeit myagg(df, pd.Grouper(freq='H'))
Out[ ]:
6.76 ms ± 12.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
df = gen_example(1e6)
%timeit myagg(df, pd.Grouper(freq='H'))
Out[ ]:
189 ms ± 425 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
测试:
idx = [0] * 4
grouper = pd.Grouper(level=0)
myagg(pd.DataFrame({'WindDir': [170, 170, 178, 182]}, index=idx), grouper)
WindDir
0 174.998473
myagg(pd.DataFrame({'WindDir': [330, 359, 1, 40]}, index=idx), grouper)
WindDir
0 2.251499
myagg(pd.DataFrame({'WindDir': [330, 359, 1, np.nan]}, index=idx), grouper)
WindDir
0 350.102878
myagg(pd.DataFrame({'WindDir': [np.nan, np.nan, np.nan, np.nan]}, index=idx), grouper)
WindDir
0 NaN
myagg(pd.DataFrame({'WindDir': [330, 990, 1, np.nan]}, index=idx), grouper)
WindDir
0 990.0