根据 pandas DataFrame 中的最后 N 行比较两列
Compare two columns based on last N rows in a pandas DataFrame
我想 groupby
"ts_code" 并根据每组的最后 N 行计算最大值和最大值之后另一列的最小值之间的百分比。具体来说,
df
ts_code high low
0 A 20 10
1 A 30 5
2 A 40 20
3 A 50 10
4 A 20 30
5 B 20 10
6 B 30 5
7 B 40 20
8 B 50 10
9 B 20 30
目标
以下是我的预期结果
ts_code high low l3_high_low_pct_chg l4_high_low_pct_chg
0 A 20 10 NA NA
1 A 30 5 NA NA
2 A 40 20 0.5 NA
3 A 50 10 0.8 0.8
4 A 20 30 0.8 0.8
5 B 50 10 NA NA
6 B 30 5 NA NA
7 B 40 20 0.9 NA
8 B 10 10 0.75 0.9
9 B 20 30 0.75 0.75
ln_high_low_pct_chg
(如l3_high_low_pct_chg
)=1-(峰后low
列的最小值)/(high
列的最大值), 每组和每一行的最后 N 行。
尝试并解决问题
df['l3_highest']=df.groupby('ts_code')['high'].transform(lambda x: x.rolling(3).max())
df['l3_lowest']=df.groupby('ts_code')['low'].transform(lambda x: x.rolling(3).min())
df['l3_high_low_pct_chg']=1-df['l3_lowest']/df['l3_highest']
但它失败了,因此对于第二行,l3_lowest
将是 5 而不是 20。我不知道如何计算峰值后的百分比。
对于最后 4 行,索引=8,低=10,高=50,低=5,l4_high_low_pct_chg
=0.9
, at index=9, high=40, low=10, l4_high_low_pct_chg
=0.75
- 如果滚动 window 是 52,对于 hy_code
880912
组和索引 1252,l52_high_low_pct_chg
将是 0.281131 而 880301
组和索引 1251 , l52_high_low_pct_chg
将是 0.321471.
按 'ts_code' 分组只是一个简单的 groupby() 函数。 DataFrame.rolling() 函数适用于单列,因此如果您需要来自多列的数据,则很难应用它。您可以使用“from numpy_ext import rolling_apply as rolling_apply_ext”,如本例所示:。但是,我刚刚创建了一个函数,该函数手动将数据帧分组为 n 个长度 sub-dataframes,然后应用该函数来计算值。 idxmax() 找到低列峰值的索引值,然后我们找到后面的值的 min()。剩下的就很简单了。
import numpy as np
import pandas as pd
df = pd.DataFrame([['A', 20, 10],
['A', 30, 5],
['A', 40, 20],
['A', 50, 10],
['A', 20, 30],
['B', 50, 10],
['B', 30, 5],
['B', 40, 20],
['B', 10, 10],
['B', 20, 30]],
columns=['ts_code', 'high', 'low']
)
def custom_f(df, n):
s = pd.Series(np.nan, index=df.index)
def sub_f(df_):
high_peak_idx = df_['high'].idxmax()
min_low_after_peak = df_.loc[high_peak_idx:]['low'].min()
max_high = df_['high'].max()
return 1 - min_low_after_peak / max_high
for i in range(df.shape[0] - n + 1):
df_ = df.iloc[i:i + n]
s.iloc[i + n - 1] = sub_f(df_)
return s
df['l3_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 3).values
df['l4_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 4).values
print(df)
如果您更喜欢使用滚动功能,此方法给出相同的输出:
def rolling_f(rolling_df):
df_ = df.loc[rolling_df.index]
high_peak_idx = df_['high'].idxmax()
min_low_after_peak = df_.loc[high_peak_idx:]["low"].min()
max_high = df_['high'].max()
return 1 - min_low_after_peak / max_high
df['l3_high_low_pct_chg'] = df.groupby("ts_code").rolling(3).apply(rolling_f).values[:, 0]
df['l4_high_low_pct_chg'] = df.groupby("ts_code").rolling(4).apply(rolling_f).values[:, 0]
print(df)
最后,如果您想进行真正的滚动 window 计算,避免任何索引查找,您可以使用 numpy_ext (https://pypi.org/project/numpy-ext/)
from numpy_ext import rolling_apply
def np_ext_f(rolling_df, n):
def rolling_apply_f(high, low):
return 1 - low[np.argmax(high):].min() / high.max()
try:
return pd.Series(rolling_apply(rolling_apply_f, n, rolling_df['high'].values, rolling_df['low'].values), index=rolling_df.index)
except ValueError:
return pd.Series(np.nan, index=rolling_df.index)
df['l3_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=3).sort_index(level=1).values
df['l4_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=4).sort_index(level=1).values
print(df)
输出:
ts_code high low l3_high_low_pct_chg l4_high_low_pct_chg
0 A 20 10 NaN NaN
1 A 30 5 NaN NaN
2 A 40 20 0.50 NaN
3 A 50 10 0.80 0.80
4 A 20 30 0.80 0.80
5 B 50 10 NaN NaN
6 B 30 5 NaN NaN
7 B 40 20 0.90 NaN
8 B 10 10 0.75 0.90
9 B 20 30 0.75 0.75
对于大型数据集,这些操作的速度成为一个问题。所以,为了比较这些不同方法的速度,我创建了一个计时函数:
import time
def timeit(f):
def timed(*args, **kw):
ts = time.time()
result = f(*args, **kw)
te = time.time()
print ('func:%r took: %2.4f sec' % \
(f.__name__, te-ts))
return result
return timed
接下来,让我们制作一个大型 DataFrame,只需将现有 DataFrame 复制 500 次即可:
df = pd.concat([df for x in range(500)], axis=0)
df = df.reset_index()
最后我们运行测试一个计时函数下的三个:
@timeit
def method_1():
df['l52_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 52).values
method_1()
@timeit
def method_2():
df['l52_high_low_pct_chg'] = df.groupby("ts_code").rolling(52).apply(rolling_f).values[:, 0]
method_2()
@timeit
def method_3():
df['l52_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=52).sort_index(level=1).values
method_3()
这给了我们这个输出:
func:'method_1' took: 2.5650 sec
func:'method_2' took: 15.1233 sec
func:'method_3' took: 0.1084 sec
因此,最快的方法是使用 numpy_ext,这是有道理的,因为它针对矢量化计算进行了优化。第二快的方法是我编写的自定义函数,它比较高效,因为它在进行一些矢量化计算的同时还进行了一些 Pandas 查找。迄今为止最慢的方法是使用 Pandas 滚动函数。
对于我的解决方案,我们将使用 .groupby("ts_code")
然后 .rolling
来处理特定大小的组和 custom_function
。这个自定义函数将获取每个组,而不是直接对接收到的值应用函数,我们将使用这些值来查询原始数据帧。然后,我们可以通过找到“高”峰值所在的行来计算您期望的值,然后查看以下行以找到最小“低”值,最后使用您的公式计算结果:
def custom_function(group, df):
# Query the original dataframe using the group values
group = df.loc[group.values]
# Calculate your formula
high_peak_row = group["high"].idxmax()
min_low_after_peak = group.loc[high_peak_row:, "low"].min()
return 1 - min_low_after_peak / group.loc[high_peak_row, "high"]
# Reset the index to roll over that column and be able query the original dataframe
df["l3_high_low_pct_chg"] = df.reset_index().groupby("ts_code")["index"].rolling(3).apply(custom_function, args=(df,)).values
df["l4_high_low_pct_chg"] = df.reset_index().groupby("ts_code")["index"].rolling(4).apply(custom_function, args=(df,)).values
输出:
ts_code high low l3_high_low_pct_chg l4_high_low_pct_chg
0 A 20 10 NaN NaN
1 A 30 5 NaN NaN
2 A 40 20 0.50 NaN
3 A 50 10 0.80 0.80
4 A 20 30 0.80 0.80
5 B 50 10 NaN NaN
6 B 30 5 NaN NaN
7 B 40 20 0.90 NaN
8 B 10 10 0.75 0.90
9 B 20 30 0.75 0.75
我们可以将这个想法进一步扩展到一个唯一的组:
groups = df.reset_index().groupby("ts_code")["index"]
for n in [3, 4]:
df[f"l{n}_high_low_pct_chg"] = groups.rolling(n).apply(custom_function, args=(df,)).values
我想 groupby
"ts_code" 并根据每组的最后 N 行计算最大值和最大值之后另一列的最小值之间的百分比。具体来说,
df
ts_code high low
0 A 20 10
1 A 30 5
2 A 40 20
3 A 50 10
4 A 20 30
5 B 20 10
6 B 30 5
7 B 40 20
8 B 50 10
9 B 20 30
目标
以下是我的预期结果
ts_code high low l3_high_low_pct_chg l4_high_low_pct_chg
0 A 20 10 NA NA
1 A 30 5 NA NA
2 A 40 20 0.5 NA
3 A 50 10 0.8 0.8
4 A 20 30 0.8 0.8
5 B 50 10 NA NA
6 B 30 5 NA NA
7 B 40 20 0.9 NA
8 B 10 10 0.75 0.9
9 B 20 30 0.75 0.75
ln_high_low_pct_chg
(如l3_high_low_pct_chg
)=1-(峰后low
列的最小值)/(high
列的最大值), 每组和每一行的最后 N 行。
尝试并解决问题
df['l3_highest']=df.groupby('ts_code')['high'].transform(lambda x: x.rolling(3).max())
df['l3_lowest']=df.groupby('ts_code')['low'].transform(lambda x: x.rolling(3).min())
df['l3_high_low_pct_chg']=1-df['l3_lowest']/df['l3_highest']
但它失败了,因此对于第二行,l3_lowest
将是 5 而不是 20。我不知道如何计算峰值后的百分比。
对于最后 4 行,索引=8,低=10,高=50,低=5,l4_high_low_pct_chg
=0.9
, at index=9, high=40, low=10, l4_high_low_pct_chg
=0.75
- 如果滚动 window 是 52,对于 hy_code
880912
组和索引 1252,l52_high_low_pct_chg
将是 0.281131 而880301
组和索引 1251 ,l52_high_low_pct_chg
将是 0.321471.
按 'ts_code' 分组只是一个简单的 groupby() 函数。 DataFrame.rolling() 函数适用于单列,因此如果您需要来自多列的数据,则很难应用它。您可以使用“from numpy_ext import rolling_apply as rolling_apply_ext”,如本例所示:
import numpy as np
import pandas as pd
df = pd.DataFrame([['A', 20, 10],
['A', 30, 5],
['A', 40, 20],
['A', 50, 10],
['A', 20, 30],
['B', 50, 10],
['B', 30, 5],
['B', 40, 20],
['B', 10, 10],
['B', 20, 30]],
columns=['ts_code', 'high', 'low']
)
def custom_f(df, n):
s = pd.Series(np.nan, index=df.index)
def sub_f(df_):
high_peak_idx = df_['high'].idxmax()
min_low_after_peak = df_.loc[high_peak_idx:]['low'].min()
max_high = df_['high'].max()
return 1 - min_low_after_peak / max_high
for i in range(df.shape[0] - n + 1):
df_ = df.iloc[i:i + n]
s.iloc[i + n - 1] = sub_f(df_)
return s
df['l3_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 3).values
df['l4_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 4).values
print(df)
如果您更喜欢使用滚动功能,此方法给出相同的输出:
def rolling_f(rolling_df):
df_ = df.loc[rolling_df.index]
high_peak_idx = df_['high'].idxmax()
min_low_after_peak = df_.loc[high_peak_idx:]["low"].min()
max_high = df_['high'].max()
return 1 - min_low_after_peak / max_high
df['l3_high_low_pct_chg'] = df.groupby("ts_code").rolling(3).apply(rolling_f).values[:, 0]
df['l4_high_low_pct_chg'] = df.groupby("ts_code").rolling(4).apply(rolling_f).values[:, 0]
print(df)
最后,如果您想进行真正的滚动 window 计算,避免任何索引查找,您可以使用 numpy_ext (https://pypi.org/project/numpy-ext/)
from numpy_ext import rolling_apply
def np_ext_f(rolling_df, n):
def rolling_apply_f(high, low):
return 1 - low[np.argmax(high):].min() / high.max()
try:
return pd.Series(rolling_apply(rolling_apply_f, n, rolling_df['high'].values, rolling_df['low'].values), index=rolling_df.index)
except ValueError:
return pd.Series(np.nan, index=rolling_df.index)
df['l3_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=3).sort_index(level=1).values
df['l4_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=4).sort_index(level=1).values
print(df)
输出:
ts_code high low l3_high_low_pct_chg l4_high_low_pct_chg
0 A 20 10 NaN NaN
1 A 30 5 NaN NaN
2 A 40 20 0.50 NaN
3 A 50 10 0.80 0.80
4 A 20 30 0.80 0.80
5 B 50 10 NaN NaN
6 B 30 5 NaN NaN
7 B 40 20 0.90 NaN
8 B 10 10 0.75 0.90
9 B 20 30 0.75 0.75
对于大型数据集,这些操作的速度成为一个问题。所以,为了比较这些不同方法的速度,我创建了一个计时函数:
import time
def timeit(f):
def timed(*args, **kw):
ts = time.time()
result = f(*args, **kw)
te = time.time()
print ('func:%r took: %2.4f sec' % \
(f.__name__, te-ts))
return result
return timed
接下来,让我们制作一个大型 DataFrame,只需将现有 DataFrame 复制 500 次即可:
df = pd.concat([df for x in range(500)], axis=0)
df = df.reset_index()
最后我们运行测试一个计时函数下的三个:
@timeit
def method_1():
df['l52_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 52).values
method_1()
@timeit
def method_2():
df['l52_high_low_pct_chg'] = df.groupby("ts_code").rolling(52).apply(rolling_f).values[:, 0]
method_2()
@timeit
def method_3():
df['l52_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=52).sort_index(level=1).values
method_3()
这给了我们这个输出:
func:'method_1' took: 2.5650 sec
func:'method_2' took: 15.1233 sec
func:'method_3' took: 0.1084 sec
因此,最快的方法是使用 numpy_ext,这是有道理的,因为它针对矢量化计算进行了优化。第二快的方法是我编写的自定义函数,它比较高效,因为它在进行一些矢量化计算的同时还进行了一些 Pandas 查找。迄今为止最慢的方法是使用 Pandas 滚动函数。
对于我的解决方案,我们将使用 .groupby("ts_code")
然后 .rolling
来处理特定大小的组和 custom_function
。这个自定义函数将获取每个组,而不是直接对接收到的值应用函数,我们将使用这些值来查询原始数据帧。然后,我们可以通过找到“高”峰值所在的行来计算您期望的值,然后查看以下行以找到最小“低”值,最后使用您的公式计算结果:
def custom_function(group, df):
# Query the original dataframe using the group values
group = df.loc[group.values]
# Calculate your formula
high_peak_row = group["high"].idxmax()
min_low_after_peak = group.loc[high_peak_row:, "low"].min()
return 1 - min_low_after_peak / group.loc[high_peak_row, "high"]
# Reset the index to roll over that column and be able query the original dataframe
df["l3_high_low_pct_chg"] = df.reset_index().groupby("ts_code")["index"].rolling(3).apply(custom_function, args=(df,)).values
df["l4_high_low_pct_chg"] = df.reset_index().groupby("ts_code")["index"].rolling(4).apply(custom_function, args=(df,)).values
输出:
ts_code high low l3_high_low_pct_chg l4_high_low_pct_chg
0 A 20 10 NaN NaN
1 A 30 5 NaN NaN
2 A 40 20 0.50 NaN
3 A 50 10 0.80 0.80
4 A 20 30 0.80 0.80
5 B 50 10 NaN NaN
6 B 30 5 NaN NaN
7 B 40 20 0.90 NaN
8 B 10 10 0.75 0.90
9 B 20 30 0.75 0.75
我们可以将这个想法进一步扩展到一个唯一的组:
groups = df.reset_index().groupby("ts_code")["index"]
for n in [3, 4]:
df[f"l{n}_high_low_pct_chg"] = groups.rolling(n).apply(custom_function, args=(df,)).values