计算一段时间内的事件并在 RxJS 中每秒产生一次总和
Count events over a period of time and yield the sum once every second in RxJS
是否可以在 RxJS 中计算一段时间内的事件并每秒产生一次总和?我有一个连续不断的事件流。每 1 秒我想获得过去 5 分钟内的事件总数 window。这个想法是用它来填充实时图表。
我知道如何以传统方式做到这一点,但真的很想了解它是如何通过反应式编程完成的。
以下是我的处理方法。
创建一个只计算接收到的事件数量的可观察对象,并将其作为 运行 总数发出(通过 scan
)。
创建第二个 observable,它只是 运行 总延迟 5 分钟。
创建第三个 observable,它只是从第一个 observable 中减去延迟的 observable。这将产生 运行 小于 5 分钟的事件总数。
创建一个最终的可观察对象,每秒对该第三个可观察对象进行一次采样。
const totalLast5Minutes = eventSource.publish(events => {
const runningTotal = events
.scan((e, total) => total + 1, 0)
.startWith(0);
const totalDelayed5Minutes = runningTotal
.delay(5000 * 60)
.startWith(0);
return Rx.Observable
.combineLatest(total, totalDelayed5Minutes, (t, td) => t - td);
});
// only sample the value once per second
Rx.Observable
.interval(1000)
.withLatestFrom(totalLast5Minutes, (interval, total) => total)
.subscribe(total => console.log(`total=${total}`));
这是另一种方法,我认为它可能更简单一些。您可以使用 windowWithTime - 请参阅 ReactiveX and RxJS doc。您可以创建重叠的 windows,这样您就可以有一个 5 分钟 window 的事件,然后是另一个 5 分钟 window 的事件,该事件在一秒后开始,然后再一秒等
如果您对每个 windows 进行计数,您将得到一系列 5 分钟事件的计数,间隔一秒。它看起来像这样:
source.windowWithTime(5 * 60000, 1000) // create 5 minute windows, one second apart
.flatMap(window => window.count()) // take each window and get the count once it completes (after 5 minutes)
.subscribe(count => console.log(count));
我已经用它来完成您所说的事情 - 在不间断的事件流中获取 window 时间内的事件发生率。通过 windows 重叠,您将在流中生成 activity 的 moving average。
是否可以在 RxJS 中计算一段时间内的事件并每秒产生一次总和?我有一个连续不断的事件流。每 1 秒我想获得过去 5 分钟内的事件总数 window。这个想法是用它来填充实时图表。
我知道如何以传统方式做到这一点,但真的很想了解它是如何通过反应式编程完成的。
以下是我的处理方法。
创建一个只计算接收到的事件数量的可观察对象,并将其作为 运行 总数发出(通过 scan
)。
创建第二个 observable,它只是 运行 总延迟 5 分钟。
创建第三个 observable,它只是从第一个 observable 中减去延迟的 observable。这将产生 运行 小于 5 分钟的事件总数。
创建一个最终的可观察对象,每秒对该第三个可观察对象进行一次采样。
const totalLast5Minutes = eventSource.publish(events => {
const runningTotal = events
.scan((e, total) => total + 1, 0)
.startWith(0);
const totalDelayed5Minutes = runningTotal
.delay(5000 * 60)
.startWith(0);
return Rx.Observable
.combineLatest(total, totalDelayed5Minutes, (t, td) => t - td);
});
// only sample the value once per second
Rx.Observable
.interval(1000)
.withLatestFrom(totalLast5Minutes, (interval, total) => total)
.subscribe(total => console.log(`total=${total}`));
这是另一种方法,我认为它可能更简单一些。您可以使用 windowWithTime - 请参阅 ReactiveX and RxJS doc。您可以创建重叠的 windows,这样您就可以有一个 5 分钟 window 的事件,然后是另一个 5 分钟 window 的事件,该事件在一秒后开始,然后再一秒等
如果您对每个 windows 进行计数,您将得到一系列 5 分钟事件的计数,间隔一秒。它看起来像这样:
source.windowWithTime(5 * 60000, 1000) // create 5 minute windows, one second apart
.flatMap(window => window.count()) // take each window and get the count once it completes (after 5 minutes)
.subscribe(count => console.log(count));
我已经用它来完成您所说的事情 - 在不间断的事件流中获取 window 时间内的事件发生率。通过 windows 重叠,您将在流中生成 activity 的 moving average。