Pandas 使用 table 方法和时间段的滚动移动平均线

Pandas rolling moving average using table method and time period

我有一些非常慢的代码,它计算不同时间段(例如 3 - 1260 天或 5 年)的多个滚动平均值,但它非常慢并且考虑到我正在应用它,内存效率有点低到大约 50,000 家全球上市公司的完整价格历史记录(大约 10 列)。我最近了解到 pandas 'table' 方法在 Pandas 1.3 版中引入,我想知道它是否可以用来使我的代码更快,理想情况下是矢量化的。

理想情况下,我想确保数据是连续的,即我不是使用陈旧数据计算滚动平均值。例如,如果数月、数周甚至数年的数据缺失,我显然不想将该数据包含在较短的移动平均线中:我希望 22 天移动平均线或多或少对应于平均线超过上个月,而不是超过任何更长的时间。我收集到在 pandas 1.3 中也可以进行这样的加权,尽管我不相信它早些时候存在(当我最初编写代码时)。

可重现的例子:

import numpy as np
import pandas as pd
from pandas.tseries.offsets import *  #for BMonthEnd/MonthEnd()



df = pd.DataFrame(dict(
    efcode = np.random.randint(0, 2, size=10000),
    date = pd.date_range(start=pd.datetime(1990,1,1), end=pd.datetime(1990,1,1) + Day(9999), freq='D'),
    liq_daily = np.random.randint(1, 100, size=10000),
    liq_daily_usd = np.random.randint(1, 100, size=10000),
    net_vwap_avg = np.random.randint(1, 100, size=10000)
))

proc_list = ['liq_daily', 'liq_daily_usd', 'net_vwap_avg']
for p in [3, 5, 10, 22, 45, 67, 125, 252, 504, 756, 1260]:
    df[[(q + '_' + str(p) + 'd') for q in proc_list]] = df.groupby('efcode')[proc_list].transform(lambda x: x.rolling(p, min_periods=int( 0.8 * p)).mean())  # 0.8 to ensure that at least 80% of required data is present`

此外,一位乐于助人的灵魂 Doralee 请求 实际 数据。所以请在下面找到 60 天 2 只股票的摘录:

       efcode       date  liq_daily_usd     liq_daily  net_vwap_avg
2813  31160EF 2021-06-28   2.504318e+07  2.100057e+07  1.032595e+07
3116  40712SS 2021-06-29   3.160801e+04  2.706165e+05  1.965338e+05
2814  31160EF 2021-06-29   2.796293e+07  2.350023e+07  2.499666e+06
3117  40712SS 2021-06-30   1.381836e+05  1.187144e+06  9.165275e+05
2815  31160EF 2021-06-30   2.368039e+07  1.997603e+07 -3.524202e+06
2816  31160EF 2021-07-01   2.801560e+07  2.364225e+07  3.666223e+06
3118  40712SS 2021-07-01   3.228064e+05  2.780417e+06  1.593273e+06
2817  31160EF 2021-07-02   1.381241e+07  1.165624e+07 -1.198601e+07
3119  40712SS 2021-07-02   2.230596e+05  1.921271e+06 -8.591462e+05
2818  31160EF 2021-07-05   7.322225e+06  6.173102e+06 -5.483137e+06
3120  40712SS 2021-07-05   2.377546e+04  2.042565e+05 -1.717014e+06
3121  40712SS 2021-07-06   6.652910e+04  5.745172e+05  3.702608e+05
2819  31160EF 2021-07-06   1.878950e+07  1.589636e+07  9.723256e+06
3122  40712SS 2021-07-07   9.590548e+04  8.310700e+05  2.565528e+05
2820  31160EF 2021-07-07   1.300278e+07  1.102641e+07 -4.869949e+06
3123  40712SS 2021-07-08   3.023052e+04  2.610580e+05 -5.700120e+05
2821  31160EF 2021-07-08   2.374049e+07  2.004245e+07  9.016044e+06
3124  40712SS 2021-07-09   3.017735e+04  2.599255e+05 -1.132500e+03
2822  31160EF 2021-07-09   2.519673e+07  2.121902e+07  1.176562e+06
3125  40712SS 2021-07-12   3.026381e+03  2.611200e+04 -2.338135e+05
2823  31160EF 2021-07-12   1.963135e+07  1.655062e+07 -4.668400e+06
3126  40712SS 2021-07-13   1.675856e+03  1.456000e+04 -1.155200e+04
2824  31160EF 2021-07-13   1.897174e+07  1.610682e+07 -4.438000e+05
3127  40712SS 2021-07-14   7.188402e+04  6.223725e+05  6.078125e+05
2825  31160EF 2021-07-14   1.741195e+07  1.471287e+07 -1.393943e+06
3128  40712SS 2021-07-15   1.457851e+03  1.268800e+04 -6.096845e+05
2826  31160EF 2021-07-15   1.929207e+07  1.633523e+07  1.622360e+06
3129  40712SS 2021-07-16   2.724538e+04  2.375360e+05  2.248480e+05
2827  31160EF 2021-07-16   1.399231e+07  1.185287e+07 -4.482365e+06
2828  31160EF 2021-07-19   2.799721e+07  2.373188e+07  1.187901e+07
3130  40712SS 2021-07-19   5.629003e+04  4.916160e+05  2.540800e+05
3131  40712SS 2021-07-20   5.944235e+04  5.196010e+05  2.798500e+04
2829  31160EF 2021-07-20   1.913041e+07  1.623974e+07 -7.492146e+06
3132  40712SS 2021-07-21   2.807122e+04  2.447360e+05 -2.748650e+05
2830  31160EF 2021-07-21   2.461993e+07  2.087425e+07  4.634518e+06
3133  40712SS 2021-07-22   3.287703e+04  2.866350e+05  4.189900e+04
2831  31160EF 2021-07-22   2.213531e+07  1.880655e+07 -2.067700e+06
2832  31160EF 2021-07-23            NaN           NaN           NaN
3134  40712SS 2021-07-23   3.051708e+03  2.660600e+04 -2.600290e+05
3141  31160EF 2021-07-26   1.702820e+07  1.442701e+07 -3.147277e+06
3146  40712SS 2021-07-26   3.216440e+04  2.787210e+05  2.521150e+05
3147  40712SS 2021-07-27   4.222169e+04  3.658725e+05  8.715150e+04
3142  31160EF 2021-07-27   3.842427e+07  3.251885e+07  1.809184e+07
3143  31160EF 2021-07-28   3.321394e+07  2.804994e+07 -4.468907e+06
3148  40712SS 2021-07-28   5.818368e+04  5.028840e+05  1.370115e+05
3149  40712SS 2021-07-29   2.889200e+01  2.480000e+02 -5.026360e+05
3144  31160EF 2021-07-29   2.673023e+07  2.248468e+07 -5.565267e+06
3145  31160EF 2021-07-30   3.174899e+07  2.670631e+07  4.221629e+06
3150  40712SS 2021-07-30   8.643426e+03  7.419250e+04  7.394450e+04
3135  40712SS 2021-08-02   8.754480e+03  7.560000e+04  1.407500e+03
2833  31160EF 2021-08-02   1.883054e+07  1.586344e+07 -1.084286e+07
3136  40712SS 2021-08-03   5.831280e+03  5.040000e+04 -2.520000e+04
2834  31160EF 2021-08-03   1.541283e+07  1.299258e+07 -2.870867e+06
3137  40712SS 2021-08-04   1.823021e+04  1.575645e+05  1.071645e+05
2835  31160EF 2021-08-04   1.330210e+07  1.123963e+07 -1.752944e+06
3138  40712SS 2021-08-05   2.296201e+04  1.986333e+05  4.106885e+04
2836  31160EF 2021-08-05   1.577779e+07  1.333485e+07  2.095215e+06
2837  31160EF 2021-08-06   1.180542e+07  1.003777e+07 -3.297073e+06
3139  40712SS 2021-08-06   1.133002e+04  9.869352e+04 -9.993982e+04
3140  40712SS 2021-08-09   6.406671e+03  5.610045e+04 -4.259307e+04

更新

@Doraelee 的回答很好:简单且快速完成工作。经过更多的思考和实验,我现在意识到 method='table' 更适合非常 wide 的数据帧;即,有很多列。对于像您的情况(3 列)这样的窄帧,method='table' 跨列发生的矢量化和并行性带来的性能提升可以忽略不计。

我在下面包含了更多代码以进行基准测试。您会注意到宽框架的性能提升大于窄框架的提升。事实上,method='table' 在窄框架上可能 更慢 (正如您自己注意到的那样),而不仅仅是因为编译开销。也许可以通过某种方式配置 Numba 来避免这种减速,或者也许 Pandas 中还没有可靠的实现——我不知道。

请注意,很难将宽时间与窄时间进行比较,因为 (i) 分组长度不同 (ii) Numba 调用的并行性——我的香草 Pandas rolling.mean 默认情况下似乎是连续的。

import numpy as np
import pandas as pd
from datetime import datetime


# it's tough to get apples-to-apples for wide vs narrow comparisons with parallelism on

num_wide_rows = 10 ** 4
num_wide_cols = 10 ** 4

num_narrow_cols = 10
num_narrow_rows = 10 ** 5

# seed generator
np.random.seed(22)
rolling_period = 14
min_periods = int(0.8 * rolling_period)

# create wide DF
wide_group_id_list = np.random.randint(low=1, high=10+1, size=num_wide_rows) # 10 possible groups
wide_group_id_list.sort()
wide_data = np.random.rand(num_wide_rows, num_wide_cols)
wide_df = pd.DataFrame(data=wide_data)
wide_df.insert(0, 'group_id', wide_group_id_list)

# create narrow DF
narrow_group_id_list = np.random.randint(low=1, high=10+1, size=num_narrow_rows) # 10 possible groups
narrow_group_id_list.sort()
narrow_data = np.random.rand(num_narrow_rows, num_narrow_cols)
narrow_df = pd.DataFrame(data=narrow_data)
narrow_df.insert(0, 'group_id', narrow_group_id_list)

def time_operation(title, df, method='single'):
    kwargs = {'engine': 'numba'} if method == 'table' else {}
    t_begin = datetime.now()
    for i in range(3): # repetitions
        df.groupby('group_id').rolling(rolling_period, min_periods=min_periods, method=method).mean(**kwargs)
    t_final = datetime.now()
    delta_t = t_final - t_begin
    print(f"'{title}' took {delta_t}.")

# (this step may be unnecessary) perform a cheap rolling mean in the hopes of smart, one-off precompilation for the timing test
narrow_df.head(2*min_periods).groupby('group_id').rolling(rolling_period, min_periods=min_periods, method='table').mean(engine='numba')

# timing experiment
time_operation('wide_df/method=single', wide_df, method='single')
time_operation('wide_df/method=table', wide_df, method='table')
time_operation('narrow_df/method=single', narrow_df, method='single')
time_operation('narrow_df/method=table', narrow_df, method='table')

我笔记本电脑的输出:

'wide_df/method=single' took 0:00:47.604131.
'wide_df/method=table' took 0:00:15.580090.
'narrow_df/method=single' took 0:00:00.365677.
'narrow_df/method=table' took 0:00:04.876920.

原始答案

下面是一些重现示例结果的代码,并使用 table 方法:

import numpy as np
import pandas as pd
from pandas.tseries.offsets import *  #for BMonthEnd/MonthEnd()

np.random.seed(22) # basic reproducibility
df = pd.DataFrame(dict(
    efcode = np.random.randint(0, 2, size=10000),
    date = pd.date_range(start=pd.datetime(1990,1,1), end=pd.datetime(1990,1,1) + Day(9999), freq='D'),
    liq_daily = np.random.randint(1, 100, size=10000),
    liq_daily_usd = np.random.randint(1, 100, size=10000),
    net_vwap_avg = np.random.randint(1, 100, size=10000)
))

proc_list = ['liq_daily', 'liq_daily_usd', 'net_vwap_avg']

# sort by efcode first, date next
df = df.sort_values(by=['efcode', 'date']).reset_index(drop=True)
    
for p in [3, 5, 10, 22, 45, 67, 125, 252, 504, 756, 1260]:
    df[[f"{q}_{p}d" for q in proc_list]] = (
        df
        .groupby('efcode')[proc_list]
        .rolling(p, min_periods=int(0.8 * p), method='table')
        .mean(engine='numba')
        .reset_index(drop=True)
    )

注意事项:

  • 对于同类比较,请在您的示例中适当地对数据帧进行排序
  • 根据 documentationtable 方法目前只能使用 numba 引擎调用
    • 这意味着对于较小的数据帧,编译的开销实际上可能会减慢代码速度,如本例所示;希望您注意到更大数据集的加速
  • efcode 作为 groupby 操作的结果保留为索引——这就是我在最后一步重置并删除索引的原因

我认为@Salmonstrikes 的代码做得不错,但速度不快。 在这种情况下,你不应该使用任何“变换”、“'table'方法”或“'numba'加速”,你只需要做一个简单的滚动平均。 所以我减少了@Salmonstrikes 代码的一些元素,它运行得更快。

import numpy as np
import pandas as pd
from pandas.tseries.offsets import *  #for BMonthEnd/MonthEnd()
import time


df = pd.DataFrame(dict(
    efcode = np.random.randint(0, 2, size=10000),
    date = pd.date_range(start=pd.datetime(1990,1,1), end=pd.datetime(1990,1,1) + Day(9999), freq='D'),
    liq_daily = np.random.randint(1, 100, size=10000),
    liq_daily_usd = np.random.randint(1, 100, size=10000),
    net_vwap_avg = np.random.randint(1, 100, size=10000)
))

proc_list = ['liq_daily', 'liq_daily_usd', 'net_vwap_avg']

start_time=time.time()
for p in [3, 5, 10, 22, 45, 67, 125, 252, 504, 756, 1260]:
    df[[(q + '_' + str(p) + 'd') for q in proc_list]] = df.groupby('efcode')[proc_list].transform(lambda x: x.rolling(p, min_periods=int( 0.8 * p)).mean())
end_time=time.time()

print(f"Your origin execution time is: {end_time-start_time}")

df = df.sort_values(by=['efcode', 'date']).reset_index(drop=True)

df2=df.copy()

proc_list = ['liq_daily', 'liq_daily_usd', 'net_vwap_avg']

start_time=time.time()

for p in [3, 5, 10, 22, 45, 67, 125, 252, 504, 756, 1260]:
    df2[[f"{q}_{p}d" for q in proc_list]] = (
        df2
        .groupby('efcode')[proc_list]
        .rolling(p, min_periods=int(0.8 * p))
        .mean()
        .reset_index(drop=True)
    )
end_time=time.time()

print(f"My execution time is: {end_time-start_time}")

start_time=time.time()

for p in [3, 5, 10, 22, 45, 67, 125, 252, 504, 756, 1260]:
    df[[f"{q}_{p}d" for q in proc_list]] = (
        df
        .groupby('efcode')[proc_list]
        .rolling(p, min_periods=int(0.8 * p), method='table')
        .mean(engine='numba')
        .reset_index(drop=True)
    )
end_time=time.time()

print(f"Salmonstrikes execution time is: {end_time-start_time}")