计算一段时间内的事件并在 RxJS 中每秒产生一次总和

Count events over a period of time and yield the sum once every second in RxJS

是否可以在 RxJS 中计算一段时间内的事件并每秒产生一次总和?我有一个连续不断的事件流。每 1 秒我想获得过去 5 分钟内的事件总数 window。这个想法是用它来填充实时图表。

我知道如何以传统方式做到这一点,但真的很想了解它是如何通过反应式编程完成的。

以下是我的处理方法。

创建一个只计算接收到的事件数量的可观察对象,并将其作为 运行 总数发出(通过 scan)。 创建第二个 observable,它只是 运行 总延迟 5 分钟。 创建第三个 observable,它只是从第一个 observable 中减去延迟的 observable。这将产生 运行 小于 5 分钟的事件总数。 创建一个最终的可观察对象,每秒对该第三个可观察对象进行一次采样。

const totalLast5Minutes = eventSource.publish(events => {
    const runningTotal = events
        .scan((e, total) => total + 1, 0)
        .startWith(0);
    const totalDelayed5Minutes = runningTotal
        .delay(5000 * 60)
        .startWith(0);
    return Rx.Observable
        .combineLatest(total, totalDelayed5Minutes, (t, td) => t - td);
});

// only sample the value once per second
Rx.Observable
    .interval(1000)
    .withLatestFrom(totalLast5Minutes, (interval, total) => total)
    .subscribe(total => console.log(`total=${total}`));

这是另一种方法,我认为它可能更简单一些。您可以使用 windowWithTime - 请参阅 ReactiveX and RxJS doc。您可以创建重叠的 windows,这样您就可以有一个 5 分钟 window 的事件,然后是另一个 5 分钟 window 的事件,该事件在一秒后开始,然后再一秒等

如果您对每个 windows 进行计数,您将得到一系列 5 分钟事件的计数,间隔一秒。它看起来像这样:

source.windowWithTime(5 * 60000, 1000)   // create 5 minute windows, one second apart
      .flatMap(window => window.count()) // take each window and get the count once it completes (after 5 minutes)
      .subscribe(count => console.log(count));

我已经用它来完成您所说的事情 - 在不间断的事件流中获取 window 时间内的事件发生率。通过 windows 重叠,您将在流中生成 activity 的 moving average