ReactiveX:从 observable 中过滤掉数字,使总数永远不会低于零

ReactiveX: filter out numbers from observable so that total never goes below zero

以下 RxJs 代码显示输入列表中的数字和 运行 总数:

let inputs = [10, 10, -30, 10, -30, 50, -25, -30, 10]

let obs$ = Rx.Observable.from(inputs)
let totals$ = obs$.scan((acc, next) => acc + next, 0)
obs$.subscribe(x => console.log('x:', x));
totals$.subscribe(x => console.log("total:", x))

totals$ 将发出:10、20、-10、0、-30...

我想以某种方式转换 obs$ observable,以便生成的 totals$ 永远不会发出负数。

即,在这种情况下,应过滤掉第一个和最后一个“-30”:

otherObs$: 10, 10, 10, -30, 50, -25, 10

totals$: 10, 20, 30, 0, 50, 25, 35

edit:请注意,我对修改后的 otherObs$ observable 感兴趣,即原始输入数字的序列,其中一些被过滤掉(obs$ 实际上将包含更多数据,所以我确实需要原始元素;"value" 只是过滤的关键)。这里的 totals$ 只是 用于显示正在发生的事情。

您可以通过将该逻辑合并到您的 reduce 函数中轻松实现这一点,也就是在 .scan():

let total$ = obs$.scan((acc, next) => {
    if (acc + next < 0) { //less than zero, don't take in the current number
        return acc;
    }
    return acc + next; //all is good, continue to add next
}, 0)

甚至在一个班轮中:

let totals$ = obs$.scan((acc, next) => (acc + next < 0 ? acc : acc + next), 0)

如果你想保留每一个值并仍然检查总和,那么别无选择,只能将每个序列存储在数组中:

let other$ =
    obs$
        .scan((acc, next) => {
            let sum = acc.reduce((a,b)=>a+b);
            if((sum+next)<0){
                return acc; // less than zero, return current arrray
            }
            return  acc.push(next); //greater or equal to zero, add current item to array
        }, [...0])
        .switchMap(array => Observable.from(array))

根据@CozyAzure 的思路,我想出了一个更好的答案。我只需要在 reducer 中存储更多的状态:原始值,累计总数,以及这个值是否被添加到总数中。

let other$ = obs$
  .scan(({value, total, valid}, next) => {
     if (total + next < 0)
       return { value: next, total: total, valid: false }
     else
       return { value: next, total: total + next, valid: true }
  }, { value: undefined, total: 0, valid: false })
  .filter(x => x.valid)
  .map(x => x.value)

这样,我可以过滤掉添加的值,并取回添加的原始值。