为每个订阅在扫描运算符上创建一个新的种子对象

Create a new seed object on scan operator for every subscription

RxJS 5.5.2

我有以下代码将数字数组拆分为具有 2 个属性的对象 'small' 对于小于 4 和 的数字'big' 其余。

const o = from([1, 2, 3, 4, 5, 6]).pipe(
  scan<number, {}>((a, b) => {
    if (b < 4) {
      a['small'].push(b);
    } else {
      a['big'].push(b);
    }
    return a;
  }, {
    'small': [],
    'big': []
  })
);
console.log('subscription 1');
o.subscribe(x => console.log(JSON.stringify(x)));
console.log('subscription 2');
o.subscribe(x => console.log(JSON.stringify(x)));

订阅后 1 个控制台打印:

{"small":[1,2,3],"big":[4,5,6]} // this is ok

订阅 2 控制台打印后:

{"small":[1,2,3,1,2,3],"big":[4,5,6,4,5,6]} // this is not ok

有没有办法在每次有人订阅时从一个新的种子对象开始?

扫描累加器 ({ small: [], big: [] }) 变异为 .push,这是一种反模式,很容易导致意外行为。

防止更改先前发出的值的一个选项可能是:

scan<number, {}>((a, b) => {
  if (b < 4) {
    return Object.assign({}, a, {small: a.small.concat([b])});
  } else {
    return Object.assign({}, a, {big: a.big.concat([b])}); 
  }
}, {
  'small': [],
  'big': []
})

不确定您到底想完成什么,但可能值得看一下 partition 运算符,它会产生两个独立的值流,例如 const [small, big] = someStream.partition(x => x < 4);.

另一种选择是将管道包装在 defer 块中,这将在订阅时重建源流。

defer(() =>
  from([1, 2, 3, 4, 5, 6]).pipe(
    scan<number, {}>((a, b) => {
      if (b < 4) {
        a['small'].push(b);
      } else {
        a['big'].push(b);
      }
      return a;
    }, {
      'small': [],
      'big': []
    })
  )
);

每次订阅都会调用 defer 块中的方法并订阅结果。尽管正如@arturgrzesiak 所提到的,变异数组被视为函数式编程中的反模式,并被扩展为函数式反应式编程。