使用 pandas 滚动统计模型协整
Rolling statsmodels cointegration using pandas
我有一个包含两个系列的 DataFrame,我知道如何使用所有数据点获得它们的协整...
import pandas as pd
import numpy as np
import statsmodels.tsa.stattools as ts
A = pd.Series(np.cumsum(np.random.normal(size=100)) + 50)
B = pd.Series(A + 5 + np.random.normal(size=100))
ts.coint(A, B)
但是,我想通过滚动 window(假设 60 天)来探索这种协整随时间发生的变化。我如何结合使用 statsmodels 和 pandas?
来实现这一点
提前致谢!
您可以通过首先创建一个数据框,分配一个整数位置序列,然后使用 pandas rolling
函数和 lambda 函数来实现这一点,该函数提取 ts.coint
的第一个元素的 return.
所以修改你的代码我们得到:
import pandas as pd
import numpy as np
import statsmodels.tsa.stattools as ts
A = pd.Series(np.cumsum(np.random.normal(size=1000)) + 50, name='A')
B = pd.Series(A + 5 + np.random.normal(size=1000), name='B')
df = pd.concat([A, B], axis=1)
df['ii'] = range(len(df))
df['ii'].rolling(100).apply(lambda ii: ts.coint(df.loc[ii, 'A'], df.loc[ii, 'B'])[0])
为了说明这一点,我将系列的大小增加到 1000,并将滚动 window 设置为 100(但您可以使用 rolling
中的选项)。
鉴于两个系列的长度相同,您可以使用 Series.index.map()
来完成。如果系列 A 和 B 从 0 开始索引:
C = A.index.map(lambda i: ts.coint(A[i-60:i], B[i-60:i])[0] if i >= 60 else np.nan)
如果系列索引是其他东西(即日期时间),您需要先重置索引并使用 iloc
进行定位:
C = A.reset_index().index.map(lambda i: ts.coint(A.iloc[i-60:i], B.iloc[i-60:i])[0] if i >= 60 else np.nan)
我有一个包含两个系列的 DataFrame,我知道如何使用所有数据点获得它们的协整...
import pandas as pd
import numpy as np
import statsmodels.tsa.stattools as ts
A = pd.Series(np.cumsum(np.random.normal(size=100)) + 50)
B = pd.Series(A + 5 + np.random.normal(size=100))
ts.coint(A, B)
但是,我想通过滚动 window(假设 60 天)来探索这种协整随时间发生的变化。我如何结合使用 statsmodels 和 pandas?
来实现这一点提前致谢!
您可以通过首先创建一个数据框,分配一个整数位置序列,然后使用 pandas rolling
函数和 lambda 函数来实现这一点,该函数提取 ts.coint
的第一个元素的 return.
所以修改你的代码我们得到:
import pandas as pd
import numpy as np
import statsmodels.tsa.stattools as ts
A = pd.Series(np.cumsum(np.random.normal(size=1000)) + 50, name='A')
B = pd.Series(A + 5 + np.random.normal(size=1000), name='B')
df = pd.concat([A, B], axis=1)
df['ii'] = range(len(df))
df['ii'].rolling(100).apply(lambda ii: ts.coint(df.loc[ii, 'A'], df.loc[ii, 'B'])[0])
为了说明这一点,我将系列的大小增加到 1000,并将滚动 window 设置为 100(但您可以使用 rolling
中的选项)。
鉴于两个系列的长度相同,您可以使用 Series.index.map()
来完成。如果系列 A 和 B 从 0 开始索引:
C = A.index.map(lambda i: ts.coint(A[i-60:i], B[i-60:i])[0] if i >= 60 else np.nan)
如果系列索引是其他东西(即日期时间),您需要先重置索引并使用 iloc
进行定位:
C = A.reset_index().index.map(lambda i: ts.coint(A.iloc[i-60:i], B.iloc[i-60:i])[0] if i >= 60 else np.nan)