基于时间戳使用 ReactiveX 实现移动平均线
Implementing moving average with ReactiveX based on timestamps
我正在使用 Rx.Net 并且我有 Observable 可以发出时间序列点(双精度,时间戳)。每次新点到达时,我都想计算最近 30 秒的平均值。我想我需要某种重叠 Window/Buffer 不是基于计数而是基于时间戳。
我找到了 this 与 SlidingWindow 实现有关的主题,但我无法弄清楚如何将其用于我的问题。
编辑:
感谢 this 我了解到我可以使用 Scan 运算符并缓冲我的点,所以基本上这解决了问题。但也许有更好的方法来做到这一点?
Buffer
和 Window
向前看,你想要回顾的东西。 Scan
是最好的起点:
public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts)
{
return BackBuffer(source, ts, Scheduler.Default);
}
public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
{
return source
.Timestamp()
.Scan(new List<Timestamped<T>>(), (list, element) => list
.Where(ti => scheduler.Now - ti.Timestamp <= ts)
.Concat(Enumerable.Repeat(element, 1))
.ToList()
)
.Select(list => list.Select(t => t.Value).ToList());
}
一旦你有了 BackBuffer
或类似的东西,剩下的就变得简单了:
source
.BackBuffer(TimeSpan.FromMilliseconds(70))
.Select(list => list.Average())
.Subscribe(average => Console.WriteLine(average));
我正在使用 Rx.Net 并且我有 Observable 可以发出时间序列点(双精度,时间戳)。每次新点到达时,我都想计算最近 30 秒的平均值。我想我需要某种重叠 Window/Buffer 不是基于计数而是基于时间戳。
我找到了 this 与 SlidingWindow 实现有关的主题,但我无法弄清楚如何将其用于我的问题。
编辑:
感谢 this 我了解到我可以使用 Scan 运算符并缓冲我的点,所以基本上这解决了问题。但也许有更好的方法来做到这一点?
Buffer
和 Window
向前看,你想要回顾的东西。 Scan
是最好的起点:
public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts)
{
return BackBuffer(source, ts, Scheduler.Default);
}
public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
{
return source
.Timestamp()
.Scan(new List<Timestamped<T>>(), (list, element) => list
.Where(ti => scheduler.Now - ti.Timestamp <= ts)
.Concat(Enumerable.Repeat(element, 1))
.ToList()
)
.Select(list => list.Select(t => t.Value).ToList());
}
一旦你有了 BackBuffer
或类似的东西,剩下的就变得简单了:
source
.BackBuffer(TimeSpan.FromMilliseconds(70))
.Select(list => list.Average())
.Subscribe(average => Console.WriteLine(average));