如何让 .scan() 只聚合未完成序列中最近发出的数组

How to have .scan() only aggregate the most recent emitted array in an uncompleted sequence

我需要转换和聚合可观察数组中的元素。该序列是开放的,可能永远不会完成(基于网络)。

我目前正在使用以下代码:

const numbers = [1,2,3];
const numbers$ = new Rx.Subject();

const output = numbers$
  .flatMap(n => n)
  .map(n => n*n)
  .scan((acc, x) => acc.concat([x]), [])
  .subscribe(n => { console.log(n); });

numbers$.next(numbers);

setTimeout(() => {
  numbers$.next([5,6,7])
}, 1000);

目前发射了多个数组,目前最后发射的值为[1, 4, 9, 25, 36, 49]。但是,我只希望在同一输入数组 中 的那些值进行平方。

即我需要输出 observable 正好发出两个数组:[1,4,9][25, 36, 49].

如何操作?

这应该是您要找的:

const output = numbers$
    .map((a) => a.map(n => n * n))
    .subscribe(n => { console.log(n); });

编辑:如果你不想使用Array.map,你可以使用RxJS。

您可以将 Array.map 替换为对值进行平方并将它们缩减回数组的可观察对象。这可以扩展为包括 distinct 或其他 RxJS 运算符(根据您的评论):

const output = numbers$
    .mergeMap(
        (a) => Rx.Observable.from(a)
            .map((n) => n * n)
            .reduce((acc, n) => { acc.push(n); return acc; }, [])
    )
    .subscribe(n => { console.log(n); });

您发布的解决方案:

const output = numbers$
    .flatMap(n => n)
    .map(n => n*n)
    .buffer(numbers$.delay(1))
    .subscribe(n => { console.log(n); });

是基于时间的,数组没有被合并的唯一原因是第一次 next 调用和第二次调用之间的间隔超过一毫秒。

看起来这就是您需要的:jsbin

const numbers = [1,2,3];
const numbers$ = new Rx.Subject();

const output = numbers$
  .map(numbers => numbers.map(n => n * n))
  .scan((acc, x) => acc.concat([x]), [])
  .subscribe(n => { console.log(n); });

numbers$.next(numbers);

setTimeout(() => {
  numbers$.next([5,6,7])
}, 1000);