基于时间戳使用 ReactiveX 实现移动平均线

Implementing moving average with ReactiveX based on timestamps

我正在使用 Rx.Net 并且我有 Observable 可以发出时间序列点(双精度,时间戳)。每次新点到达时,我都想计算最近 30 秒的平均值。我想我需要某种重叠 Window/Buffer 不是基于计数而是基于时间戳。

我找到了 this 与 SlidingWindow 实现有关的主题,但我无法弄清楚如何将其用于我的问题。

编辑:

感谢 this 我了解到我可以使用 Scan 运算符并缓冲我的点,所以基本上这解决了问题。但也许有更好的方法来做到这一点?

BufferWindow 向前看,你想要回顾的东西。 Scan是最好的起点:

public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts)
{
    return BackBuffer(source, ts, Scheduler.Default);
}
public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
{
    return source
        .Timestamp()
        .Scan(new List<Timestamped<T>>(), (list, element) => list
            .Where(ti => scheduler.Now - ti.Timestamp <= ts)
            .Concat(Enumerable.Repeat(element, 1))
            .ToList()
        )
        .Select(list => list.Select(t => t.Value).ToList());
}

一旦你有了 BackBuffer 或类似的东西,剩下的就变得简单了:

source
    .BackBuffer(TimeSpan.FromMilliseconds(70))
    .Select(list => list.Average())
    .Subscribe(average => Console.WriteLine(average));