突发数据的反应性积累

Reactive accumulation of data coming in bursts

问题:存在数值流。值以突发方式推送,因此 100 个值可以彼此非常接近(时间上),比如每 5-10 毫秒,然后它 possibly 停止一段时间,然后可以突发again.The 想法是显示在 most 长度为 500 毫秒的 windows 的累加值(总和)。

我的第一次尝试是使用 Buffer(500ms),但这会导致总和为 0 的事件(每 500 毫秒)持续抽取(作为累积的缓冲区项 os 0),它可以被修复通过空缓冲区进行过滤,但我真的很想完全避免这种情况,并且仅在 "silence".

一段时间后实际推送值后才打开缓冲

其他限制:实现是 UniRx,它不包含所有 Rx 运算符,特别是 Window(我怀疑在这种情况下可能有用),因此解决方案仅限于包括 Buffer 在内的基本运算符。

因为你只想要总和,所以使用 Buffer 是多余的。 我们可以 运行 一个 ScanAggregation.

  var burstSum =
    source
        .Scan(0, (acc, current) => acc + current)
        .Throttle(TimeSpan.FromMilliseconds(500))
        .Take(1)
        .Repeat();

这将启动一个流,该流累积总和,直到流空闲至少 500 毫秒。

但是如果我们想至少在每个时间桶中发出,我们就必须走不同的路。我们做出两个假设:

  1. 元素之间的时间间隔之和应等于第一个元素和最后一个元素之间的时间间隔。
  2. Throttle 将在流完成时释放最后一个值。

    source
        .TimeInterval()
        .Scan((acc, cur) => new TimeInterval<int>(acc.Value + cur.Value, acc.Interval + cur.Interval))
        .TakeWhile(acc => acc.Interval <= TimeSpan.FromMilliseconds(500))
        .Throttle(TimeSpan.FromMilliseconds(500))
        .Select(acc => acc.Value)
        .Take(1)
        .Repeat();