Pandas:参差不齐的时间序列的时间加权滚动平均值
Pandas: Time-weighted rolling average on ragged time series
我有一个参差不齐(意思是频率不规则)的时间索引 DataFrame,我想对其执行时间加权滚动平均值,以保持 DataFrame 的原始索引。假设记录的值在被另一个值取代之前是有效的。实现这一目标的一种方法是将参差不齐的 DataFrame 上采样到统一频率,然后进行滚动平均:
import pandas as pd
import numpy as np
def time_weighted_average_using_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
# Leads to high memory usage
original_index = df.index.copy()
avg = (
df.resample("1s")
.ffill()
.rolling(avg_window, closed="left", min_periods=int(avg_window[0])))
.mean()
.reindex(original_index)
)
return avg
if __name__ == "__main__":
df = pd.DataFrame(
{"A": [0, 1, 2, 3, 4, 5]},
index=[
pd.Timestamp("20130101 09:00:00"),
pd.Timestamp("20130101 09:00:02"),
pd.Timestamp("20130101 09:00:03"),
pd.Timestamp("20130101 09:00:05"),
pd.Timestamp("20130101 09:00:06"),
pd.Timestamp("20130101 09:00:10"),
],
)
expected_avg = pd.DataFrame(
{"A": [np.nan, np.nan, 1 / 3, 5 / 3, 7 / 3, 4]},
index=[
pd.Timestamp("20130101 09:00:00"),
pd.Timestamp("20130101 09:00:02"),
pd.Timestamp("20130101 09:00:03"),
pd.Timestamp("20130101 09:00:05"),
pd.Timestamp("20130101 09:00:06"),
pd.Timestamp("20130101 09:00:10"),
],
)
pd.testing.assert_frame_equal(
time_weighted_average_using_upsampling(df=df, avg_window="3s"), expected_avg
)
这个问题是上采样破坏了参差不齐的 df 提供的稀疏表示的目的。稀疏表示具有内存效率,而上采样版本则不然。这就引出了一个问题:如何实现上面显示的结果 而 必须对整个 df 进行上采样?
这里有一个替代方案,而不是对整个数据帧进行上采样,您可以首先检查两行之间的时间差异大于间隙的位置。然后将 3s 删除到具有间隙的行和 reindex
df 以及这些特定新时间戳的并集。创建这些行后,您可以 groupby
使用添加新索引的位置,每组 resample
1s,最后 rolling
使用您所做的方法。 Reindex
以 df 结尾。
rule = 3
rolling_win = f'{rule}s'
sparse = df.index.to_series().diff().dt.total_seconds().ge(rule)
new_timestamps = df.index[sparse] - pd.Timedelta(seconds=rule)
print(new_timestamps)
#DatetimeIndex(['2013-01-01 09:00:07'], dtype='datetime64[ns]', freq=None)
#reindex with the new
df_ = df.reindex(df.index.union(new_timestamps))
#perform first the resample 1s per group, then clean the dataframe to do the rolling.mean
#finally reindex like original df
df_ = (df_.groupby(df_.index.isin(new_timestamps).cumsum())
.resample("1s").ffill()
.reset_index(level=0, drop=True).ffill()
.rolling(rolling_win, closed="left", min_periods=rule)\
.mean()
.reindex(df.index)
)
print(df_)
A
2013-01-01 09:00:00 NaN
2013-01-01 09:00:02 NaN
2013-01-01 09:00:03 0.333333
2013-01-01 09:00:05 1.666667
2013-01-01 09:00:06 2.333333
2013-01-01 09:00:10 4.000000
在这种情况下,它并不是很有趣,因为差距实际上很小,但如果差距很大,那么它就会变得有用。
EDIT 或其他选项,可能更好,union
从您删除 1s、2s、3s...(取决于规则)的原始索引创建的所有索引。现在您只有滚动所需的索引 reindex
、ffill
和 rolling.mean
。最后的结果相同
from functools import reduce
rule = 3
rolling_win = f'{rule}s'
idx = df.index
df_ = (df.reindex(reduce(lambda x, y: x.union(y),
[idx - pd.Timedelta(seconds=i)
for i in range(0, rule+1)]))
.ffill()
.rolling(rolling_win, closed="left", min_periods=rule)\
.mean()
.reindex(df.index)
)
受@Ben.T启发的两种可能的解决方案:
def time_weighted_average_using_local_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
"""Uses second resolution up-sampling only on smaller windows at a time."""
original_index = df.index.copy()
avg = (
df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
.rolling(avg_window, closed="both", min_periods=2)
.apply(lambda x: x.resample("1s").ffill()[:-1].mean(skipna=False))
.reindex(original_index)
)
return avg
def time_weighted_average_using_index_weighting(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
"""Uses weighting by duration, by ensuring every window has a point at the start."""
original_index = df.index.copy()
avg = (
df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
.rolling(avg_window, closed="both", min_periods=2)
.apply(lambda x: np.average(x[:-1], weights=x.index.to_series().diff()[1:].dt.seconds))
.reindex(original_index)
)
return avg
第一个up-samples一次单滚动windows,而后者实际上做的是参差不齐的时间加权平均,通过确保在windows我们关心。这是通过包含移动 window 长度的原始索引来完成的。
我还没有衡量相关案例的表现。
编辑:
我决定在大约 100,000 行的第二分辨率数据集上测试函数,并使用 20 分钟 windows(!) 两种变体都慢得令人无法忍受,但我认为我有一个新的赢家:
def time_weighted_average_using_index_weighting2(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
"""Uses weighting by duration, by ensuring every window has a point at the start."""
original_index = df.index.copy()
avg = df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
avg = (
avg.multiply(avg.index.to_series().diff().dt.seconds.shift(-1), axis=0)
.divide(pd.Timedelta(avg_window).seconds)
.rolling(avg_window, closed="left")
.sum()
.reindex(original_index)
)
avg[~((avg.index - pd.Timedelta(avg_window)) >= original_index[0])] = np.nan
return avg
这个在滚动之前预先加权,因此我们不用 .sum()
而不是 apply()
。这转化为巨大的速度提升。无论平均数 window.
的大小如何,我们最多也可以将索引加倍。
我有一个参差不齐(意思是频率不规则)的时间索引 DataFrame,我想对其执行时间加权滚动平均值,以保持 DataFrame 的原始索引。假设记录的值在被另一个值取代之前是有效的。实现这一目标的一种方法是将参差不齐的 DataFrame 上采样到统一频率,然后进行滚动平均:
import pandas as pd
import numpy as np
def time_weighted_average_using_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
# Leads to high memory usage
original_index = df.index.copy()
avg = (
df.resample("1s")
.ffill()
.rolling(avg_window, closed="left", min_periods=int(avg_window[0])))
.mean()
.reindex(original_index)
)
return avg
if __name__ == "__main__":
df = pd.DataFrame(
{"A": [0, 1, 2, 3, 4, 5]},
index=[
pd.Timestamp("20130101 09:00:00"),
pd.Timestamp("20130101 09:00:02"),
pd.Timestamp("20130101 09:00:03"),
pd.Timestamp("20130101 09:00:05"),
pd.Timestamp("20130101 09:00:06"),
pd.Timestamp("20130101 09:00:10"),
],
)
expected_avg = pd.DataFrame(
{"A": [np.nan, np.nan, 1 / 3, 5 / 3, 7 / 3, 4]},
index=[
pd.Timestamp("20130101 09:00:00"),
pd.Timestamp("20130101 09:00:02"),
pd.Timestamp("20130101 09:00:03"),
pd.Timestamp("20130101 09:00:05"),
pd.Timestamp("20130101 09:00:06"),
pd.Timestamp("20130101 09:00:10"),
],
)
pd.testing.assert_frame_equal(
time_weighted_average_using_upsampling(df=df, avg_window="3s"), expected_avg
)
这个问题是上采样破坏了参差不齐的 df 提供的稀疏表示的目的。稀疏表示具有内存效率,而上采样版本则不然。这就引出了一个问题:如何实现上面显示的结果 而 必须对整个 df 进行上采样?
这里有一个替代方案,而不是对整个数据帧进行上采样,您可以首先检查两行之间的时间差异大于间隙的位置。然后将 3s 删除到具有间隙的行和 reindex
df 以及这些特定新时间戳的并集。创建这些行后,您可以 groupby
使用添加新索引的位置,每组 resample
1s,最后 rolling
使用您所做的方法。 Reindex
以 df 结尾。
rule = 3
rolling_win = f'{rule}s'
sparse = df.index.to_series().diff().dt.total_seconds().ge(rule)
new_timestamps = df.index[sparse] - pd.Timedelta(seconds=rule)
print(new_timestamps)
#DatetimeIndex(['2013-01-01 09:00:07'], dtype='datetime64[ns]', freq=None)
#reindex with the new
df_ = df.reindex(df.index.union(new_timestamps))
#perform first the resample 1s per group, then clean the dataframe to do the rolling.mean
#finally reindex like original df
df_ = (df_.groupby(df_.index.isin(new_timestamps).cumsum())
.resample("1s").ffill()
.reset_index(level=0, drop=True).ffill()
.rolling(rolling_win, closed="left", min_periods=rule)\
.mean()
.reindex(df.index)
)
print(df_)
A
2013-01-01 09:00:00 NaN
2013-01-01 09:00:02 NaN
2013-01-01 09:00:03 0.333333
2013-01-01 09:00:05 1.666667
2013-01-01 09:00:06 2.333333
2013-01-01 09:00:10 4.000000
在这种情况下,它并不是很有趣,因为差距实际上很小,但如果差距很大,那么它就会变得有用。
EDIT 或其他选项,可能更好,union
从您删除 1s、2s、3s...(取决于规则)的原始索引创建的所有索引。现在您只有滚动所需的索引 reindex
、ffill
和 rolling.mean
。最后的结果相同
from functools import reduce
rule = 3
rolling_win = f'{rule}s'
idx = df.index
df_ = (df.reindex(reduce(lambda x, y: x.union(y),
[idx - pd.Timedelta(seconds=i)
for i in range(0, rule+1)]))
.ffill()
.rolling(rolling_win, closed="left", min_periods=rule)\
.mean()
.reindex(df.index)
)
受@Ben.T启发的两种可能的解决方案:
def time_weighted_average_using_local_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
"""Uses second resolution up-sampling only on smaller windows at a time."""
original_index = df.index.copy()
avg = (
df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
.rolling(avg_window, closed="both", min_periods=2)
.apply(lambda x: x.resample("1s").ffill()[:-1].mean(skipna=False))
.reindex(original_index)
)
return avg
def time_weighted_average_using_index_weighting(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
"""Uses weighting by duration, by ensuring every window has a point at the start."""
original_index = df.index.copy()
avg = (
df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
.rolling(avg_window, closed="both", min_periods=2)
.apply(lambda x: np.average(x[:-1], weights=x.index.to_series().diff()[1:].dt.seconds))
.reindex(original_index)
)
return avg
第一个up-samples一次单滚动windows,而后者实际上做的是参差不齐的时间加权平均,通过确保在windows我们关心。这是通过包含移动 window 长度的原始索引来完成的。
我还没有衡量相关案例的表现。
编辑: 我决定在大约 100,000 行的第二分辨率数据集上测试函数,并使用 20 分钟 windows(!) 两种变体都慢得令人无法忍受,但我认为我有一个新的赢家:
def time_weighted_average_using_index_weighting2(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
"""Uses weighting by duration, by ensuring every window has a point at the start."""
original_index = df.index.copy()
avg = df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
avg = (
avg.multiply(avg.index.to_series().diff().dt.seconds.shift(-1), axis=0)
.divide(pd.Timedelta(avg_window).seconds)
.rolling(avg_window, closed="left")
.sum()
.reindex(original_index)
)
avg[~((avg.index - pd.Timedelta(avg_window)) >= original_index[0])] = np.nan
return avg
这个在滚动之前预先加权,因此我们不用 .sum()
而不是 apply()
。这转化为巨大的速度提升。无论平均数 window.