RxJS 在组时间获取排放数组。 Firebase 触发器

RxJS get array of emissions at group time. Firebase triggers

我是 RxJS 世界的新手,对此我有点不知所措。希望有人能帮帮我。

我有一个 source observable(通过 AngularFire 的 Firebase),它在随机的派克时间不断向我发出大量数据(2 秒内多达 50 或 80 次排放 window),因为这是一个正在放缓的过程根据我的项目绩效,我认为处理此问题的正确方法是将排放量分组到一个数组中,然后对收到的所有数据进行交易并将其插入商店。

我正在寻找的结果如下所示:

考虑到我将放置 "hold" 3 秒的时间,我想要以下结果:

           1s     1.5s
--> 30 --> 60 --> 100

          1s           2s
--> 5 --> 1 --> 50 --> 70
  1. [30, 60, 100] --> 1.5s间隔时间
  2. [5, 1, 50, 70] --> 2s间隔时间

数组中的值将是从收到的第一个排放开始的特定时间收到的排放。在该特定时间后,它将 "restart" 在下一批排放中初始化(实际上可能在 1 秒或 2 小时内,但随后,间隔将触发 2 秒的排放)

到目前为止我已经尝试过使用 Window 和 Buffer,也许我没有正确使用它们或者我只是愚蠢但我找不到得到我刚刚解释的结果。

filter((snapshot) => { if (snapshot.payload.val().reference) { return snapshot; } }),
window(interval(2000)),
mergeAll(),
withTransaction((snapshots:[]) => {
   snapshots.forEach(snapshot => {
     if (snapshot.type === 'child_changed') {

       this.store.add(snapshot.key, snapshot.val());

     } else if (snapshot.type === 'child_changed') {

       this.store.replace(snapshot.key, snapshot.val());

     } else if (snapshot.type === 'child_removed') {

       this.store.remove(snapshot.key);

     }
   })
})

我什至不知道 RxJS 是否可行(我猜是这样。我已经看到很多很酷的东西)但是任何建议或指南都将不胜感激。

非常感谢!

注意:withTransaction 是自定义运算符。

不确定你想要什么,但看起来你想要 bufferTime?

const source = timer(0, 500);
const buffered = source.pipe(bufferTime(3000));
buffered.subscribe(val => console.log(val));

这将每 3 秒将缓冲期内收集的所有值作为数组发出。

闪电战演示:https://stackblitz.com/edit/rxjs-vpu97e?file=index.ts

在您的示例中,我认为您只需将其用作:

filter((snapshot) => snapshot.payload.val().reference), // this is all you need for filter
bufferTime(2000),
withTransaction((snapshots:[]) => {
   ...
})